From 3efda44bde7430a833603f3bf00c52c87a286764 Mon Sep 17 00:00:00 2001 From: Samarah Uriarte Date: Fri, 23 Jun 2023 14:41:43 -0400 Subject: [PATCH] RGW: Add D4N chunking mechanism Signed-off-by: Samarah Uriarte --- src/rgw/driver/d4n/rgw_sal_d4n.cc | 272 ++++++++++++++++++- src/rgw/driver/d4n/rgw_sal_d4n.h | 61 ++++- src/rgw/rgw_redis_driver.cc | 432 +++++++++++++++++------------- src/rgw/rgw_redis_driver.h | 78 ++++-- 4 files changed, 618 insertions(+), 225 deletions(-) diff --git a/src/rgw/driver/d4n/rgw_sal_d4n.cc b/src/rgw/driver/d4n/rgw_sal_d4n.cc index 594bab15bc1..bd2b9011e86 100644 --- a/src/rgw/driver/d4n/rgw_sal_d4n.cc +++ b/src/rgw/driver/d4n/rgw_sal_d4n.cc @@ -392,10 +392,202 @@ int D4NFilterObject::D4NFilterReadOp::prepare(optional_yield y, const DoutPrefix return ret; } +void D4NFilterObject::D4NFilterReadOp::cancel() { + aio->drain(); +} + +int D4NFilterObject::D4NFilterReadOp::drain(const DoutPrefixProvider* dpp) { + auto c = aio->wait(); + while (!c.empty()) { + int r = flush(dpp, std::move(c)); + if (r < 0) { + cancel(); + return r; + } + c = aio->wait(); + } + return flush(dpp, std::move(c)); +} + +int D4NFilterObject::D4NFilterReadOp::flush(const DoutPrefixProvider* dpp, rgw::AioResultList&& results) { + int r = rgw::check_for_errors(results); + + if (r < 0) { + return r; + } + + std::list bl_list; + + auto cmp = [](const auto& lhs, const auto& rhs) { return lhs.id < rhs.id; }; + results.sort(cmp); // merge() requires results to be sorted first + completed.merge(results, cmp); // merge results in sorted order + + ldpp_dout(dpp, 20) << "D4NFilterObject::In flush:: " << dendl; + + while (!completed.empty() && completed.front().id == offset) { + auto bl = std::move(completed.front().data); + + ldpp_dout(dpp, 20) << "D4NFilterObject::flush:: calling handle_data for offset: " << offset << " bufferlist length: " << bl.length() << dendl; + + bl_list.push_back(bl); + offset += bl.length(); + int r = client_cb->handle_data(bl, 0, bl.length()); + if (r < 0) { + return r; + } + completed.pop_front_and_dispose(std::default_delete{}); + } + + return 0; +} + int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int64_t ofs, int64_t end, RGWGetDataCB* cb, optional_yield y) { - /* Execute cache replacement policy */ + const uint64_t window_size = g_conf()->rgw_get_obj_window_size; + std::string oid = source->get_key().get_oid(); + + ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << "oid: " << oid << " ofs: " << ofs << " end: " << end << dendl; + + this->client_cb = cb; + this->cb->set_client_cb(cb); // what's this for? -Sam + + uint64_t obj_max_req_size = g_conf()->rgw_get_obj_max_req_size; + uint64_t start_part_num = 0; + uint64_t part_num = ofs/obj_max_req_size; //part num of ofs wrt start of the object + uint64_t adjusted_start_ofs = part_num*obj_max_req_size; //in case of ranged request, adjust the start offset to the beginning of a chunk/ part + uint64_t diff_ofs = ofs - adjusted_start_ofs; //difference between actual offset and adjusted offset + off_t len = (end - adjusted_start_ofs) + 1; + uint64_t num_parts = (len%obj_max_req_size) == 0 ? len/obj_max_req_size : (len/obj_max_req_size) + 1; //calculate num parts based on adjusted offset + //len_to_read is the actual length read from a part/ chunk in cache, while part_len is the length of the chunk/ part in cache + uint64_t cost = 0, len_to_read = 0, part_len = 0; + + if (y) { + aio = rgw::make_throttle(window_size, y); + + ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << "obj_max_req_size " << obj_max_req_size << + " num_parts " << num_parts << " adjusted_start_offset: " << adjusted_start_ofs << " len: " << len << dendl; + + this->offset = ofs; + + do { + uint64_t id = adjusted_start_ofs; + if (start_part_num == (num_parts - 1)) { + len_to_read = len; + part_len = len; + cost = len; + } else { + len_to_read = obj_max_req_size; + cost = obj_max_req_size; + part_len = obj_max_req_size; + } + if (start_part_num == 0) { + len_to_read -= diff_ofs; + id += diff_ofs; + } + + uint64_t read_ofs = diff_ofs; //read_ofs is the actual offset to start reading from the current part/ chunk + std::string oid_in_cache = source->get_bucket()->get_marker() + "_" + oid + "_" + std::to_string(adjusted_start_ofs) + "_" + std::to_string(part_len); + rgw_raw_obj r_obj; + r_obj.oid = oid_in_cache; + ceph::bufferlist bl; + + ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): READ FROM CACHE: oid=" << + oid_in_cache << " length to read is: " << len_to_read << " part num: " << start_part_num << + " read_ofs: " << read_ofs << " part len: " << part_len << dendl; + + if (source->driver->get_cache_driver()->get(dpp, oid_in_cache, ofs, part_len, bl, source->get_attrs()) == 0) { + // Read From Cache + auto completed = aio->get(r_obj, rgw::Aio::cache_read_op(dpp, y, source->driver->get_cache_driver(), + read_ofs, len_to_read, oid_in_cache), cost, id); + + ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: flushing data for oid: " << oid_in_cache << dendl; + + auto r = flush(dpp, std::move(completed)); + + if (r < 0) { + ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to flush, r= " << r << dendl; + return r; + } + } else { + //for ranged requests, for last part, the whole part might exist in the cache + oid_in_cache = source->get_bucket()->get_marker() + "_" + oid + "_" + std::to_string(adjusted_start_ofs) + "_" + std::to_string(obj_max_req_size); + r_obj.oid = oid_in_cache; + + ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): READ FROM CACHE: oid=" << + oid_in_cache << " length to read is: " << len_to_read << " part num: " << start_part_num << + " read_ofs: " << read_ofs << " part len: " << part_len << dendl; + + if (source->driver->get_cache_driver()->get(dpp, oid_in_cache, ofs, obj_max_req_size, bl, source->get_attrs()) == 0) { + // Read From Cache + auto completed = aio->get(r_obj, rgw::Aio::cache_read_op(dpp, y, source->driver->get_cache_driver(), + read_ofs, len_to_read, oid_in_cache), cost, id); + + ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: flushing data for oid: " << oid_in_cache << dendl; + + auto r = flush(dpp, std::move(completed)); + + if (r < 0) { + ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to flush, r= " << r << dendl; + return r; + } + } else { + ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: draining data for oid: " << oid_in_cache << dendl; + + auto r = drain(dpp); + + if (r < 0) { + ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to drain, r= " << r << dendl; + return r; + } + + break; + } + } + + if (start_part_num == (num_parts - 1)) { + ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: draining data for oid: " << oid_in_cache << dendl; + return drain(dpp); + } else { + adjusted_start_ofs += obj_max_req_size; + } + + start_part_num += 1; + len -= obj_max_req_size; + } while (start_part_num < num_parts); + } + + ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Fetching object from backend store" << dendl; + + Attrs obj_attrs; + if (source->has_attrs()) { + obj_attrs = source->get_attrs(); + } + + if (source->is_compressed() || obj_attrs.find(RGW_ATTR_CRYPT_MODE) != obj_attrs.end() || !y) { + ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Skipping writing to cache" << dendl; + this->cb->bypass_cache_write(); + } + + if (start_part_num == 0) { + this->cb->set_ofs(ofs); + } else { + this->cb->set_ofs(adjusted_start_ofs); + ofs = adjusted_start_ofs; // redundant? -Sam + } + + this->cb->set_ofs(ofs); + auto r = next->iterate(dpp, ofs, end, this->cb.get(), y); + + if (r < 0) { + ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to fetch object from backend store, r= " << r << dendl; + return r; + } + + return this->cb->flush_last_part(dpp); + + /* + / Execute cache replacement policy / int policyRet = source->driver->get_policy_driver()->cachePolicy->get_block(dpp, source->driver->get_cache_block(), source->driver->get_cache_driver()); @@ -410,12 +602,12 @@ int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int uint64_t len = end - ofs + 1; std::string oid(source->get_name()); - /* Local cache check */ + / Local cache check / if (source->driver->get_cache_driver()->key_exists(dpp, oid)) { // Entire object for now -Sam ret = source->driver->get_cache_driver()->get(dpp, source->get_key().get_oid(), ofs, len, bl, source->get_attrs()); cb->handle_data(bl, ofs, len); } else { - /* Block directory check */ + / Block directory check / int getDirReturn = source->driver->get_block_dir()->get_value(source->driver->get_cache_block()); if (getDirReturn >= -1) { @@ -427,11 +619,11 @@ int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int // remote cache get - /* Cache block locally */ + / Cache block locally / ret = source->driver->get_cache_driver()->put(dpp, source->get_key().get_oid(), bl, len, source->get_attrs()); // May be put_async -Sam if (!ret) { - int updateValueReturn = source->driver->get_block_dir()->update_field(source->driver->get_cache_block(), "hostsList", ""/*local cache ip from config*/); + int updateValueReturn = source->driver->get_block_dir()->update_field(source->driver->get_cache_block(), "hostsList", ""/local cache ip from config/); if (updateValueReturn < 0) { ldpp_dout(dpp, 20) << "D4N Filter: Block directory update value operation failed." << dendl; @@ -442,7 +634,7 @@ int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int cb->handle_data(bl, ofs, len); } } else { - /* Write tier retrieval */ + / Write tier retrieval / ldpp_dout(dpp, 20) << "D4N Filter: Block directory get operation failed." << dendl; getDirReturn = source->driver->get_obj_dir()->get_value(&(source->driver->get_cache_block()->cacheObj)); @@ -455,11 +647,11 @@ int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int // retrieve from write back cache, which will be stored as a cache driver instance in the filter - /* Cache block locally */ + / Cache block locally / ret = source->driver->get_cache_driver()->put(dpp, source->get_key().get_oid(), bl, len, source->get_attrs()); // May be put_async -Sam if (!ret) { - int updateValueReturn = source->driver->get_block_dir()->update_field(source->driver->get_cache_block(), "hostsList", ""/*local cache ip from config*/); + int updateValueReturn = source->driver->get_block_dir()->update_field(source->driver->get_cache_block(), "hostsList", ""/local cache ip from config/); if (updateValueReturn < 0) { ldpp_dout(dpp, 20) << "D4N Filter: Block directory update value operation failed." << dendl; @@ -470,15 +662,15 @@ int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int cb->handle_data(bl, ofs, len); } } else { - /* Backend store retrieval */ + / Backend store retrieval / ldpp_dout(dpp, 20) << "D4N Filter: Object directory get operation failed." << dendl; ret = next->iterate(dpp, ofs, end, cb, y); if (!ret) { - /* Cache block locally */ + / Cache block locally / ret = source->driver->get_cache_driver()->put(dpp, source->get_key().get_oid(), bl, len, source->get_attrs()); // May be put_async -Sam - /* Store block in directory */ + / Store block in directory / rgw::d4n::BlockDirectory* tempBlockDir = source->driver->get_block_dir(); // remove later -Sam source->driver->get_cache_block()->hostsList.push_back(tempBlockDir->get_addr().host + ":" + std::to_string(tempBlockDir->get_addr().port)); // local cache address -Sam @@ -501,7 +693,63 @@ int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int if (ret < 0) ldpp_dout(dpp, 20) << "D4N Filter: Cache iterate operation failed." << dendl; - return next->iterate(dpp, ofs, end, cb, y); + return next->iterate(dpp, ofs, end, cb, y); */ +} + +int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::flush_last_part(const DoutPrefixProvider* dpp) +{ + save_dpp = dpp; + last_part = true; + return handle_data(bl_rem, 0, bl_rem.length()); +} + +int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len) +{ + auto rgw_get_obj_max_req_size = g_conf()->rgw_get_obj_max_req_size; + + if (!last_part && bl.length() <= rgw_get_obj_max_req_size) { + auto r = client_cb->handle_data(bl, bl_ofs, bl_len); + + if (r < 0) { + return r; + } + } + + //Accumulating data from backend store into rgw_get_obj_max_req_size sized chunks and then writing to cache + if (write_to_cache) { + const std::lock_guard l(d3n_get_data.d3n_lock); + Attrs attrs; + + if (bl.length() > 0 && last_part) { // if bl = bl_rem has data and this is the last part, write it to cache + std::string oid = this->oid + "_" + std::to_string(ofs) + "_" + std::to_string(bl_len); + + filter->get_cache_driver()->put(save_dpp, oid, bl, bl.length(), attrs); // need attrs for just chunk? -Sam + } else if (bl.length() == rgw_get_obj_max_req_size && bl_rem.length() == 0) { // if bl is the same size as rgw_get_obj_max_req_size, write it to cache + std::string oid = this->oid + "_" + std::to_string(ofs) + "_" + std::to_string(bl_len); + ofs += bl_len; + + filter->get_cache_driver()->put(save_dpp, oid, bl, bl.length(), attrs); // need attrs for just chunk? -Sam + } else { //copy data from incoming bl to bl_rem till it is rgw_get_obj_max_req_size, and then write it to cache + uint64_t rem_space = rgw_get_obj_max_req_size - bl_rem.length(); + uint64_t len_to_copy = rem_space > bl.length() ? bl.length() : rem_space; + bufferlist bl_copy; + + bl.splice(0, len_to_copy, &bl_copy); + bl_rem.claim_append(bl_copy); + + if (bl_rem.length() == g_conf()->rgw_get_obj_max_req_size) { + std::string oid = this->oid + "_" + std::to_string(ofs) + "_" + std::to_string(bl_rem.length()); + ofs += bl_rem.length(); + + filter->get_cache_driver()->put(save_dpp, oid, bl_rem, bl_rem.length(), attrs); // need attrs for just chunk? -Sam + + bl_rem.clear(); + bl_rem = std::move(bl); + } + } + } + + return 0; } int D4NFilterObject::D4NFilterDeleteOp::delete_obj(const DoutPrefixProvider* dpp, diff --git a/src/rgw/driver/d4n/rgw_sal_d4n.h b/src/rgw/driver/d4n/rgw_sal_d4n.h index 1345b8aa265..6150223d1e8 100644 --- a/src/rgw/driver/d4n/rgw_sal_d4n.h +++ b/src/rgw/driver/d4n/rgw_sal_d4n.h @@ -20,6 +20,7 @@ #include "rgw_oidc_provider.h" #include "rgw_role.h" #include "common/dout.h" +#include "rgw_aio_throttle.h" #include "rgw_redis_driver.h" #include "driver/d4n/d4n_directory.h" @@ -104,15 +105,55 @@ class D4NFilterObject : public FilterObject { public: struct D4NFilterReadOp : FilterReadOp { - D4NFilterObject* source; - - D4NFilterReadOp(std::unique_ptr _next, D4NFilterObject* _source) : FilterReadOp(std::move(_next)), - source(_source) {} - virtual ~D4NFilterReadOp() = default; - - virtual int prepare(optional_yield y, const DoutPrefixProvider* dpp) override; - virtual int iterate(const DoutPrefixProvider* dpp, int64_t ofs, int64_t end, - RGWGetDataCB* cb, optional_yield y) override; + public: + class D4NFilterGetCB: public RGWGetDataCB { + private: + D4NFilterDriver* filter; // don't need -Sam ? + std::string oid; + RGWGetDataCB* client_cb; + uint64_t ofs = 0, len = 0; + bufferlist bl_rem; + bool last_part{false}; + D3nGetObjData d3n_get_data; // should make d4n version? -Sam + bool write_to_cache{true}; + + public: + D4NFilterGetCB(D4NFilterDriver* _filter, std::string& _oid) : filter(_filter), + oid(_oid) {} + + const DoutPrefixProvider* save_dpp; + + int handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len) override; + void set_client_cb(RGWGetDataCB* client_cb) { this->client_cb = client_cb;} + void set_ofs(uint64_t ofs) { this->ofs = ofs; } + int flush_last_part(const DoutPrefixProvider* dpp); + void bypass_cache_write() { this->write_to_cache = false; } + }; + + D4NFilterObject* source; + + D4NFilterReadOp(std::unique_ptr _next, D4NFilterObject* _source) : FilterReadOp(std::move(_next)), + source(_source) + { + std::string oid = source->get_bucket()->get_marker() + "_" + source->get_key().get_oid(); + cb = std::make_unique(source->driver, oid); + } + virtual ~D4NFilterReadOp() = default; + + virtual int prepare(optional_yield y, const DoutPrefixProvider* dpp) override; + virtual int iterate(const DoutPrefixProvider* dpp, int64_t ofs, int64_t end, + RGWGetDataCB* cb, optional_yield y) override; + + private: + RGWGetDataCB* client_cb; + std::unique_ptr cb; + std::unique_ptr aio; + uint64_t offset = 0; // next offset to write to client + rgw::AioResultList completed; // completed read results, sorted by offset + + int flush(const DoutPrefixProvider* dpp, rgw::AioResultList&& results); + void cancel(); + int drain(const DoutPrefixProvider* dpp); }; struct D4NFilterDeleteOp : FilterDeleteOp { @@ -191,7 +232,7 @@ class D4NFilterWriter : public FilterWriter { const req_context& rctx, uint32_t flags) override; bool is_atomic() { return atomic; }; - const DoutPrefixProvider* dpp() { return save_dpp; } + const DoutPrefixProvider* dpp() { return save_dpp; } }; } } // namespace rgw::sal diff --git a/src/rgw/rgw_redis_driver.cc b/src/rgw/rgw_redis_driver.cc index 72c83980775..87c488dd4ea 100644 --- a/src/rgw/rgw_redis_driver.cc +++ b/src/rgw/rgw_redis_driver.cc @@ -1,5 +1,6 @@ #include #include "rgw_redis_driver.h" +//#include "rgw_ssd_driver.h" #define dout_subsys ceph_subsys_rgw #define dout_context g_ceph_context @@ -21,7 +22,8 @@ std::vector baseFields { "max_buckets", "data"}; -std::vector< std::pair > build_attrs(rgw::sal::Attrs* binary) { +std::vector< std::pair > build_attrs(rgw::sal::Attrs* binary) +{ std::vector< std::pair > values; rgw::sal::Attrs::iterator attrs; @@ -35,16 +37,37 @@ std::vector< std::pair > build_attrs(rgw::sal::Attrs* return values; } -int RedisDriver::insert_entry(const DoutPrefixProvider* dpp, std::string key, off_t offset, uint64_t len) { +int RedisDriver::find_client(const DoutPrefixProvider* dpp) +{ + if (client.is_connected()) + return 0; + + if (addr.host == "" || addr.port == 0) { + dout(10) << "RGW Redis Cache: Redis cache endpoint was not configured correctly" << dendl; + return EDESTADDRREQ; + } + + client.connect(addr.host, addr.port, nullptr); + + if (!client.is_connected()) + return ECONNREFUSED; + + return 0; +} + +int RedisDriver::insert_entry(const DoutPrefixProvider* dpp, std::string key, off_t offset, uint64_t len) +{ auto ret = entries.emplace(key, Entry(key, offset, len)); return ret.second; } -int RedisDriver::remove_entry(const DoutPrefixProvider* dpp, std::string key) { +int RedisDriver::remove_entry(const DoutPrefixProvider* dpp, std::string key) +{ return entries.erase(key); } -std::optional RedisDriver::get_entry(const DoutPrefixProvider* dpp, std::string key) { +std::optional RedisDriver::get_entry(const DoutPrefixProvider* dpp, std::string key) +{ auto iter = entries.find(key); if (iter != entries.end()) { @@ -54,7 +77,23 @@ std::optional RedisDriver::get_entry(const DoutPrefixProvider* dpp, std:: return std::nullopt; } -int RedisDriver::initialize(CephContext* cct, const DoutPrefixProvider* dpp) { +/* Currently an attribute but will also be part of the Entry metadata once consistency is guaranteed -Sam +int RedisDriver::update_local_weight(const DoutPrefixProvider* dpp, std::string key, int localWeight) +{ + auto iter = entries.find(key); + + if (iter != entries.end()) { + iter->second.localWeight = localWeight; + return 0; + } + + return -1; +} +*/ + +int RedisDriver::initialize(CephContext* cct, const DoutPrefixProvider* dpp) +{ + this->cct = cct; addr.host = cct->_conf->rgw_d4n_host; // change later -Sam addr.port = cct->_conf->rgw_d4n_port; @@ -71,166 +110,8 @@ int RedisDriver::initialize(CephContext* cct, const DoutPrefixProvider* dpp) { return 0; } -bool RedisDriver::key_exists(const DoutPrefixProvider* dpp, const std::string& key) { - int result = -1; - std::string entryName = "rgw-object:" + key + ":cache"; - std::vector keys; - keys.push_back(entryName); - - if (!client.is_connected()) - return ECONNREFUSED; - - try { - client.exists(keys, [&result](cpp_redis::reply &reply) { - if (reply.is_integer()) { - result = reply.as_integer(); - } - }); - - client.sync_commit(std::chrono::milliseconds(1000)); - } catch(std::exception &e) {} - - return result; -} - -std::vector RedisDriver::list_entries(const DoutPrefixProvider* dpp) { - std::vector keys; - std::vector entries; - - if (!client.is_connected()) - return {}; - - try { - size_t cursor = 0; - const std::string pattern = "*:cache"; - - do { - auto reply = client.scan(cursor, pattern); - client.sync_commit(std::chrono::milliseconds(1000)); - - auto arr = reply.get().as_array(); - cursor = std::stoi(arr[0].as_string()); - auto result = arr[1].as_array(); - - for (auto it = result.begin(); it != result.end(); ++it) { - int i = std::distance(result.begin(), it); - std::string entryName = result[i].as_string(); - keys.push_back(entryName.substr(11, entryName.length() - 17)); - } - } while (cursor != 0); - } catch(std::exception &e) { - return {}; - } - - /* Construct list of entries */ - for (auto it = keys.begin(); it != keys.end(); ++it) { - Entry entry; - - if (key_exists(dpp, *it)) { - try { - std::vector fields; - std::string entryName = "rgw-object:" + *it + ":cache"; - - entry.key = *it; - fields.push_back("offset"); - fields.push_back("len"); - fields.push_back("localWeight"); - - client.hmget(entryName, fields, [&entry](cpp_redis::reply &reply) { - if (reply.is_array()) { - auto arr = reply.as_array(); - - if (!arr[0].is_null()) { - entry.offset = std::stol(arr[0].as_string().c_str()); - entry.len = std::stoi(arr[1].as_string()); - entry.localWeight = std::stoi(arr[2].as_string()); - } - } - }); - - client.sync_commit(std::chrono::milliseconds(1000)); - } catch(std::exception &e) { - return {}; // return failure or skip entry? -Sam - } - } else { // if one entry isn't found, shoud entire operation return a failure? -Sam - dout(20) << "RGW Redis Cache: Entry " << *it << " was not retrievable." << dendl; - } - - entries.push_back(entry); - } - - return entries; -} - -size_t RedisDriver::get_num_entries(const DoutPrefixProvider* dpp) { - int result = -1; - - if (!client.is_connected()) - return ECONNREFUSED; - - try { - client.keys(":cache", [&result](cpp_redis::reply &reply) { - if (!reply.is_null()) { - result = reply.as_integer(); - } - }); - - client.sync_commit(std::chrono::milliseconds(1000)); - - if (result < 0) { - return -1; - } - } catch(std::exception &e) { - return -1; - } - - return result; -} - -Partition RedisDriver::get_current_partition_info(const DoutPrefixProvider* dpp) { - Partition part; - return part; // Implement -Sam -} - -uint64_t RedisDriver::get_free_space(const DoutPrefixProvider* dpp) { - int result = -1; - - if (!client.is_connected()) - return ECONNREFUSED; - - try { - client.info([&result](cpp_redis::reply &reply) { - if (!reply.is_null()) { - int usedMem = -1; - int maxMem = -1; - - std::istringstream iss(reply.as_string()); - std::string line; - while (std::getline(iss, line)) { - size_t pos = line.find_first_of(":"); - if (pos != std::string::npos) { - if (line.substr(0, pos) == "used_memory") { - usedMem = std::stoi(line.substr(pos + 1, line.length() - pos - 2)); - } else if (line.substr(0, line.find_first_of(":")) == "maxmemory") { - maxMem = std::stoi(line.substr(pos + 1, line.length() - pos - 2)); - } - } - } - - if (usedMem > -1 && maxMem > -1) - result = maxMem - usedMem; - } - }); - - client.sync_commit(std::chrono::milliseconds(1000)); - } catch(std::exception &e) { - return -1; - } - - return result; -} - -int RedisDriver::put(const DoutPrefixProvider* dpp, const std::string& key, bufferlist& bl, uint64_t len, rgw::sal::Attrs& attrs) { +int RedisDriver::put(const DoutPrefixProvider* dpp, const std::string& key, bufferlist& bl, uint64_t len, rgw::sal::Attrs& attrs) +{ std::string entryName = "rgw-object:" + key + ":cache"; if (!client.is_connected()) @@ -249,7 +130,7 @@ int RedisDriver::put(const DoutPrefixProvider* dpp, const std::string& key, buff client.sync_commit(std::chrono::milliseconds(1000)); - if (result != 0) { + if (result <= 0) { return -1; } } catch(std::exception &e) { @@ -279,7 +160,8 @@ int RedisDriver::put(const DoutPrefixProvider* dpp, const std::string& key, buff return 0; } -int RedisDriver::get(const DoutPrefixProvider* dpp, const std::string& key, off_t offset, uint64_t len, bufferlist& bl, rgw::sal::Attrs& attrs) { // for whole objects? -Sam +int RedisDriver::get(const DoutPrefixProvider* dpp, const std::string& key, off_t offset, uint64_t len, bufferlist& bl, rgw::sal::Attrs& attrs) +{ std::string result; std::string entryName = "rgw-object:" + key + ":cache"; @@ -324,7 +206,13 @@ int RedisDriver::get(const DoutPrefixProvider* dpp, const std::string& key, off_ return 0; } -int RedisDriver::append_data(const DoutPrefixProvider* dpp, const::std::string& key, bufferlist& bl_data) { +rgw::AioResultList RedisDriver::get_async(const DoutPrefixProvider* dpp, optional_yield y, rgw::Aio* aio, const std::string& key, off_t ofs, uint64_t len, uint64_t cost, uint64_t id) +{ + return {}; +} + +int RedisDriver::append_data(const DoutPrefixProvider* dpp, const::std::string& key, bufferlist& bl_data) +{ std::string result; std::string value = ""; std::string entryName = "rgw-object:" + key + ":cache"; @@ -370,7 +258,8 @@ int RedisDriver::append_data(const DoutPrefixProvider* dpp, const::std::string& return 0; } -int RedisDriver::delete_data(const DoutPrefixProvider* dpp, const::std::string& key) { +int RedisDriver::delete_data(const DoutPrefixProvider* dpp, const::std::string& key) +{ int result = 0; std::string entryName = "rgw-object:" + key + ":cache"; std::vector deleteField; @@ -398,8 +287,8 @@ int RedisDriver::delete_data(const DoutPrefixProvider* dpp, const::std::string& return result - 1; } -int RedisDriver::get_attrs(const DoutPrefixProvider* dpp, const std::string& key, rgw::sal::Attrs& attrs) { - int exists = -2; +int RedisDriver::get_attrs(const DoutPrefixProvider* dpp, const std::string& key, rgw::sal::Attrs& attrs) +{ std::string result; std::string entryName = "rgw-object:" + key + ":cache"; @@ -441,7 +330,7 @@ int RedisDriver::get_attrs(const DoutPrefixProvider* dpp, const std::string& key } getFields.erase(std::find(getFields.begin(), getFields.end(), "data")); /* Do not query for data field */ - + int exists = -1; /* Get attributes from cache */ try { client.hmget(entryName, getFields, [&exists, &attrs, &getFields](cpp_redis::reply &reply) { @@ -475,7 +364,8 @@ int RedisDriver::get_attrs(const DoutPrefixProvider* dpp, const std::string& key return 0; } -int RedisDriver::set_attrs(const DoutPrefixProvider* dpp, const std::string& key, rgw::sal::Attrs& attrs) { +int RedisDriver::set_attrs(const DoutPrefixProvider* dpp, const std::string& key, rgw::sal::Attrs& attrs) +{ /* Creating the index based on oid */ std::string entryName = "rgw-object:" + key + ":cache"; std::string result; @@ -509,7 +399,8 @@ int RedisDriver::set_attrs(const DoutPrefixProvider* dpp, const std::string& key return 0; } -int RedisDriver::update_attrs(const DoutPrefixProvider* dpp, const std::string& key, rgw::sal::Attrs& attrs) { +int RedisDriver::update_attrs(const DoutPrefixProvider* dpp, const std::string& key, rgw::sal::Attrs& attrs) +{ std::string result; std::string entryName = "rgw-object:" + key + ":cache"; @@ -544,7 +435,8 @@ int RedisDriver::update_attrs(const DoutPrefixProvider* dpp, const std::string& return 0; } -int RedisDriver::delete_attrs(const DoutPrefixProvider* dpp, const std::string& key, rgw::sal::Attrs& del_attrs) { +int RedisDriver::delete_attrs(const DoutPrefixProvider* dpp, const std::string& key, rgw::sal::Attrs& del_attrs) +{ int result = 0; std::string entryName = "rgw-object:" + key + ":cache"; @@ -605,7 +497,8 @@ int RedisDriver::delete_attrs(const DoutPrefixProvider* dpp, const std::string& return -2; } -std::string RedisDriver::get_attr(const DoutPrefixProvider* dpp, const std::string& key, const std::string& attr_name) { +std::string RedisDriver::get_attr(const DoutPrefixProvider* dpp, const std::string& key, const std::string& attr_name) +{ int exists = -2; std::string result; std::string entryName = "rgw-object:" + key + ":cache"; @@ -658,7 +551,8 @@ std::string RedisDriver::get_attr(const DoutPrefixProvider* dpp, const std::stri return attrValue; } -int RedisDriver::set_attr(const DoutPrefixProvider* dpp, const std::string& key, const std::string& attr_name, const std::string& attrVal) { +int RedisDriver::set_attr(const DoutPrefixProvider* dpp, const std::string& key, const std::string& attr_name, const std::string& attrVal) +{ /* Creating the index based on key */ std::string entryName = "rgw-object:" + key + ":cache"; int result = -1; @@ -682,19 +576,185 @@ int RedisDriver::set_attr(const DoutPrefixProvider* dpp, const std::string& key, return result; } -std::unique_ptr RedisDriver::get_cache_aio_request_ptr(const DoutPrefixProvider* dpp) +std::unique_ptr RedisDriver::get_cache_aio_request_ptr(const DoutPrefixProvider* dpp) +{ + return std::make_unique(this); +} + +bool RedisDriver::key_exists(const DoutPrefixProvider* dpp, const std::string& key) +{ + int result = -1; + std::string entryName = "rgw-object:" + key + ":cache"; + std::vector keys; + keys.push_back(entryName); + + if (!client.is_connected()) + find_client(dpp); + + try { + client.exists(keys, [&result](cpp_redis::reply &reply) { + if (reply.is_integer()) { + result = reply.as_integer(); + } + }); + + client.sync_commit(std::chrono::milliseconds(1000)); + } catch(std::exception &e) {} + + return result; +} + +std::vector RedisDriver::list_entries(const DoutPrefixProvider* dpp) +{ + std::vector result; + + for (auto it = entries.begin(); it != entries.end(); ++it) { + result.push_back(it->second); + } + + return result; +} + +size_t RedisDriver::get_num_entries(const DoutPrefixProvider* dpp) +{ + return entries.size(); +} + +Partition RedisDriver::get_current_partition_info(const DoutPrefixProvider* dpp) +{ + Partition part; + return part; // Implement -Sam +} + +uint64_t RedisDriver::get_free_space(const DoutPrefixProvider* dpp) +{ + int result = -1; + + if (!client.is_connected()) + find_client(dpp); + + try { + client.info([&result](cpp_redis::reply &reply) { + if (!reply.is_null()) { + int usedMem = -1; + int maxMem = -1; + + std::istringstream iss(reply.as_string()); + std::string line; + while (std::getline(iss, line)) { + size_t pos = line.find_first_of(":"); + if (pos != std::string::npos) { + if (line.substr(0, pos) == "used_memory") { + usedMem = std::stoi(line.substr(pos + 1, line.length() - pos - 2)); + } else if (line.substr(0, line.find_first_of(":")) == "maxmemory") { + maxMem = std::stoi(line.substr(pos + 1, line.length() - pos - 2)); + } + } + } + + if (usedMem > -1 && maxMem > -1) + result = maxMem - usedMem; + } + }); + + client.sync_commit(std::chrono::milliseconds(1000)); + } catch(std::exception &e) { + return -1; + } + + return result; +} + +int RedisDriver::AsyncReadOp::init(const DoutPrefixProvider *dpp, CephContext* cct, const std::string& file_path, off_t read_ofs, off_t read_len, void* arg) { - return std::make_unique(this); + ldpp_dout(dpp, 20) << "RedisCache: " << __func__ << "(): file_path=" << file_path << dendl; + aio_cb.reset(new struct aiocb); + memset(aio_cb.get(), 0, sizeof(struct aiocb)); + aio_cb->aio_fildes = TEMP_FAILURE_RETRY(::open(file_path.c_str(), O_RDONLY|O_CLOEXEC|O_BINARY)); + + if (aio_cb->aio_fildes < 0) { + int err = errno; + ldpp_dout(dpp, 1) << "ERROR: RedisCache: " << __func__ << "(): can't open " << file_path << " : " << " error: " << err << dendl; + return -err; + } + + if (cct->_conf->rgw_d3n_l1_fadvise != POSIX_FADV_NORMAL) { + posix_fadvise(aio_cb->aio_fildes, 0, 0, g_conf()->rgw_d3n_l1_fadvise); + } + + bufferptr bp(read_len); + aio_cb->aio_buf = bp.c_str(); + result.append(std::move(bp)); + + aio_cb->aio_nbytes = read_len; + aio_cb->aio_offset = read_ofs; + aio_cb->aio_sigevent.sigev_notify = SIGEV_THREAD; + aio_cb->aio_sigevent.sigev_notify_function = libaio_cb_aio_dispatch; + aio_cb->aio_sigevent.sigev_notify_attributes = nullptr; + aio_cb->aio_sigevent.sigev_value.sival_ptr = arg; + + return 0; } -rgw::AioResultList RedisDriver::get_async(const DoutPrefixProvider* dpp, optional_yield y, rgw::Aio* aio, const std::string& key, off_t ofs, uint64_t len, uint64_t cost, uint64_t id) +void RedisDriver::AsyncReadOp::libaio_cb_aio_dispatch(sigval sigval) { - rgw_raw_obj r_obj; - r_obj.oid = key; - return aio->get(r_obj, rgw::Aio::cache_read_op(dpp, y, this, ofs, len, key), cost, id); + auto p = std::unique_ptr{static_cast(sigval.sival_ptr)}; + auto op = std::move(p->user_data); + const int ret = -aio_error(op.aio_cb.get()); + boost::system::error_code ec; + if (ret < 0) { + ec.assign(-ret, boost::system::system_category()); + } + + ceph::async::dispatch(std::move(p), ec, std::move(op.result)); +} + +template +auto RedisDriver::AsyncReadOp::create(const Executor1& ex1, CompletionHandler&& handler) +{ + auto p = Completion::create(ex1, std::move(handler)); + return p; } -void RedisCacheAioRequest::cache_aio_read(const DoutPrefixProvider* dpp, optional_yield y, const std::string& key, off_t ofs, uint64_t len, rgw::Aio* aio, rgw::AioResult& r) {} -void RedisCacheAioRequest::cache_aio_write(const DoutPrefixProvider* dpp, optional_yield y, const std::string& key, bufferlist& bl, uint64_t len, rgw::Aio* aio, rgw::AioResult& r) {} +template +auto RedisDriver::get_async(const DoutPrefixProvider *dpp, ExecutionContext& ctx, const std::string& key, + off_t read_ofs, off_t read_len, CompletionToken&& token) +{ + std::string location = "";//partition_info.location + key; + ldpp_dout(dpp, 20) << "RedisCache: " << __func__ << "(): location=" << location << dendl; + + using Op = AsyncReadOp; + using Signature = typename Op::Signature; + boost::asio::async_completion init(token); + auto p = Op::create(ctx.get_executor(), init.completion_handler); + auto& op = p->user_data; + + int ret = op.init(dpp, cct, location, read_ofs, read_len, p.get()); + if(0 == ret) { + ret = ::aio_read(op.aio_cb.get()); + } + // ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): ::aio_read(), ret=" << ret << dendl; + /* if(ret < 0) { + auto ec = boost::system::error_code{-ret, boost::system::system_category()}; + ceph::async::post(std::move(p), ec, bufferlist{}); + } else { + (void)p.release(); + }*/ + //return init.result.get(); +} + +void RedisCacheAioRequest::cache_aio_read(const DoutPrefixProvider* dpp, optional_yield y, const std::string& key, off_t ofs, uint64_t len, rgw::Aio* aio, rgw::AioResult& r) +{ + using namespace boost::asio; + async_completion init(y.get_yield_context()); + auto ex = get_associated_executor(init.completion_handler); + + ldpp_dout(dpp, 20) << "RedisCache: " << __func__ << "(): key=" << key << dendl; + cache_driver->get_async(dpp, y.get_io_context(), key, ofs, len, bind_executor(ex, RedisDriver::libaio_handler{aio, r})); +} + +void RedisCacheAioRequest::cache_aio_write(const DoutPrefixProvider* dpp, optional_yield y, const std::string& key, bufferlist& bl, uint64_t len, rgw::Aio* aio, rgw::AioResult& r) +{ +} -} } // namespace rgw::cal +} } // namespace rgw::cache diff --git a/src/rgw/rgw_redis_driver.h b/src/rgw/rgw_redis_driver.h index af7c34a299d..1f8ea953b94 100644 --- a/src/rgw/rgw_redis_driver.h +++ b/src/rgw/rgw_redis_driver.h @@ -1,6 +1,8 @@ #ifndef CEPH_REDISDRIVER_H #define CEPH_REDISDRIVER_H +#include +#include "common/async/completion.h" #include #include #include @@ -13,26 +15,16 @@ namespace rgw { namespace cache { class RedisDriver; class RedisCacheAioRequest: public CacheAioRequest { -public: - RedisCacheAioRequest(RedisDriver* cache_driver) : cache_driver(cache_driver) {} - virtual ~RedisCacheAioRequest() = default; - virtual void cache_aio_read(const DoutPrefixProvider* dpp, optional_yield y, const std::string& key, off_t ofs, uint64_t len, rgw::Aio* aio, rgw::AioResult& r) override; - virtual void cache_aio_write(const DoutPrefixProvider* dpp, optional_yield y, const std::string& key, bufferlist& bl, uint64_t len, rgw::Aio* aio, rgw::AioResult& r) override; -private: - RedisDriver* cache_driver; + public: + RedisCacheAioRequest(RedisDriver* cache_driver) : cache_driver(cache_driver) {} + virtual ~RedisCacheAioRequest() = default; + virtual void cache_aio_read(const DoutPrefixProvider* dpp, optional_yield y, const std::string& key, off_t ofs, uint64_t len, rgw::Aio* aio, rgw::AioResult& r) override; + virtual void cache_aio_write(const DoutPrefixProvider* dpp, optional_yield y, const std::string& key, bufferlist& bl, uint64_t len, rgw::Aio* aio, rgw::AioResult& r) override; + private: + RedisDriver* cache_driver; }; class RedisDriver : public CacheDriver { - private: - cpp_redis::client client; - rgw::d4n::Address addr; - std::unordered_map entries; - - int find_client(const DoutPrefixProvider* dpp); - int insert_entry(const DoutPrefixProvider* dpp, std::string key, off_t offset, uint64_t len); - int remove_entry(const DoutPrefixProvider* dpp, std::string key); - std::optional get_entry(const DoutPrefixProvider* dpp, std::string key); - public: RedisDriver() : CacheDriver() {} @@ -60,6 +52,58 @@ class RedisDriver : public CacheDriver { virtual uint64_t get_free_space(const DoutPrefixProvider* dpp) override; virtual std::unique_ptr get_cache_aio_request_ptr(const DoutPrefixProvider* dpp) override; + + struct libaio_handler { // should this be the same as SSDDriver? -Sam + rgw::Aio* throttle = nullptr; + rgw::AioResult& r; + + // read callback + void operator()(boost::system::error_code ec, bufferlist bl) const { + r.result = -ec.value(); + r.data = std::move(bl); + throttle->put(r); + } + }; + template + auto get_async(const DoutPrefixProvider *dpp, ExecutionContext& ctx, const std::string& key, + off_t read_ofs, off_t read_len, CompletionToken&& token); + + private: + cpp_redis::client client; + rgw::d4n::Address addr; + std::unordered_map entries; + CephContext* cct; + + int find_client(const DoutPrefixProvider* dpp); + int insert_entry(const DoutPrefixProvider* dpp, std::string key, off_t offset, uint64_t len); + int remove_entry(const DoutPrefixProvider* dpp, std::string key); + std::optional get_entry(const DoutPrefixProvider* dpp, std::string key); + + // unique_ptr with custom deleter for struct aiocb + struct libaio_aiocb_deleter { + void operator()(struct aiocb* c) { + if(c->aio_fildes > 0) { + if( ::close(c->aio_fildes) != 0) { + } + } + delete c; + } + }; + + using unique_aio_cb_ptr = std::unique_ptr; + + struct AsyncReadOp { + bufferlist result; + unique_aio_cb_ptr aio_cb; + using Signature = void(boost::system::error_code, bufferlist); + using Completion = ceph::async::Completion; + + int init(const DoutPrefixProvider *dpp, CephContext* cct, const std::string& file_path, off_t read_ofs, off_t read_len, void* arg); + static void libaio_cb_aio_dispatch(sigval sigval); + + template + static auto create(const Executor1& ex1, CompletionHandler&& handler); + }; }; } } // namespace rgw::cal -- 2.39.5