backend, and ensuring that write to the cache is atomic.
Signed-off-by: Pritha Srivastava <prsrivas@redhat.com>
//Accumulating data from backend store into rgw_get_obj_max_req_size sized chunks and then writing to cache
if (write_to_cache) {
- const std::lock_guard l(d4n_get_data_lock);
rgw::d4n::CacheBlock block, existing_block;
rgw::d4n::BlockDirectory* blockDir = source->driver->get_block_dir();
block.hostsList.push_back(blockDir->cct->_conf->rgw_local_cache_address);
if (bl.length() > 0 && last_part) { // if bl = bl_rem has data and this is the last part, write it to cache
std::string oid = prefix + "_" + std::to_string(ofs) + "_" + std::to_string(bl_len);
- if (!filter->get_policy_driver()->get_cache_policy()->exist_key(oid)) { //In case of concurrent reads for the same object, the block is already cached
+ if (!filter->get_policy_driver()->get_cache_policy()->exist_key(oid)) {
block.blockID = ofs;
block.size = bl.length();
block.version = version;
auto ret = filter->get_policy_driver()->get_cache_policy()->eviction(dpp, block.size, *y);
if (ret == 0) {
- //Should we replace each put_async with put, to ensure data is actually written to the cache before updating the data structures and before the lock is released?
ret = filter->get_cache_driver()->put(dpp, oid, bl, bl.length(), attrs, *y);
if (ret == 0) {
filter->get_policy_driver()->get_cache_policy()->update(dpp, oid, ofs, bl.length(), version, *y);
}
}
} else {
- if (blockDir->update_field(&block, "blockHosts", blockDir->cct->_conf->rgw_local_cache_address, *y) < 0)
- ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory update_field method failed." << dendl;
+ ldpp_dout(dpp, 0) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): put() to cache backend failed with error: " << ret << dendl;
}
}
}
block.blockID = ofs;
block.size = bl.length();
block.version = version;
- if (!filter->get_policy_driver()->get_cache_policy()->exist_key(oid)) { //In case of concurrent reads for the same object, the block is already cached
+ if (!filter->get_policy_driver()->get_cache_policy()->exist_key(oid)) {
auto ret = filter->get_policy_driver()->get_cache_policy()->eviction(dpp, block.size, *y);
if (ret == 0) {
ret = filter->get_cache_driver()->put(dpp, oid, bl, bl.length(), attrs, *y);
}
}
} else {
- if (blockDir->update_field(&block, "blockHosts", blockDir->cct->_conf->rgw_local_cache_address, *y) < 0)
- ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory update_field method failed." << dendl;
+ ldpp_dout(dpp, 0) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): put() to cache backend failed with error: " << ret << dendl;
}
}
}
if (bl_rem.length() == rgw_get_obj_max_req_size) {
std::string oid = prefix + "_" + std::to_string(ofs) + "_" + std::to_string(bl_rem.length());
- if (!filter->get_policy_driver()->get_cache_policy()->exist_key(oid)) { //In case of concurrent reads for the same object, the block is already cached
+ if (!filter->get_policy_driver()->get_cache_policy()->exist_key(oid)) {
ofs += bl_rem.length();
block.blockID = ofs;
block.size = bl_rem.length();
}
}
} else {
- ldpp_dout(dpp, 20) << "D4N Filter: " << __func__ << " An error occured while caching oid: " << oid << " error: " << ret << dendl;
+ ldpp_dout(dpp, 0) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): put() to cache backend failed with error: " << ret << dendl;
}
} else {
ldpp_dout(dpp, 20) << "D4N Filter: " << __func__ << " An error occured during eviction: " << " error: " << ret << dendl;
uint64_t ofs = 0, len = 0;
bufferlist bl_rem;
bool last_part{false};
- std::mutex d4n_get_data_lock;
bool write_to_cache{true};
const DoutPrefixProvider* dpp;
optional_yield* y;
int r = 0;
bufferlist src = bl;
- r = op.prepare_libaio_write_op(dpp, src, len, key, partition_info.location);
+ std::string temp_key = key + "_" + std::to_string(index++);
+ ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): temp key=" << temp_key << dendl;
+ r = op.prepare_libaio_write_op(dpp, src, len, temp_key, partition_info.location);
op.cb->aio_sigevent.sigev_notify = SIGEV_THREAD;
op.cb->aio_sigevent.sigev_notify_function = SSDDriver::AsyncWriteRequest::libaio_write_cb;
op.cb->aio_sigevent.sigev_notify_attributes = nullptr;
op.cb->aio_sigevent.sigev_value.sival_ptr = (void*)p.get();
op.key = key;
+ op.temp_key = temp_key;
op.dpp = dpp;
op.priv_data = this;
op.attrs = std::move(attrs);
-
if (r >= 0) {
r = ::aio_write(op.cb.get());
} else {
auto p = std::unique_ptr<Completion>{static_cast<Completion*>(sigval.sival_ptr)};
auto op = std::move(p->user_data);
ldpp_dout(op.dpp, 20) << "INFO: AsyncWriteRequest::libaio_write_cb: key: " << op.key << dendl;
+ int ret = -aio_error(op.cb.get());
+ boost::system::error_code ec;
+ if (ret < 0) {
+ ec.assign(-ret, boost::system::system_category());
+ ceph::async::dispatch(std::move(p), ec);
+ return;
+ }
int attr_ret = 0;
if (op.attrs.size() > 0) {
//TODO - fix yield_context
optional_yield y{null_yield};
- attr_ret = op.priv_data->set_attrs(op.dpp, op.key, op.attrs, y);
+ attr_ret = op.priv_data->set_attrs(op.dpp, op.temp_key, op.attrs, y);
if (attr_ret < 0) {
- ldpp_dout(op.dpp, 0) << "ERROR: put::set_attrs: failed to set attrs, ret = " << attr_ret << dendl;
+ ldpp_dout(op.dpp, 0) << "ERROR: AsyncWriteRequest::libaio_write_yield_cb::set_attrs: failed to set attrs, ret = " << attr_ret << dendl;
+ ec.assign(-ret, boost::system::system_category());
+ ceph::async::dispatch(std::move(p), ec);
+ return;
}
}
efs::space_info space = efs::space(partition_info.location);
op.priv_data->set_free_space(op.dpp, space.available);
- const int ret = -aio_error(op.cb.get());
- boost::system::error_code ec;
+ std::string new_path = partition_info.location + op.key;
+ std::string old_path = partition_info.location + op.temp_key;
+
+ ldpp_dout(op.dpp, 20) << "INFO: AsyncWriteRequest::libaio_write_yield_cb: temp_key: " << op.temp_key << dendl;
+
+ ret = rename(old_path.c_str(), new_path.c_str());
if (ret < 0) {
+ ret = errno;
+ ldpp_dout(op.dpp, 0) << "ERROR: put::rename: failed to rename file: " << ret << dendl;
ec.assign(-ret, boost::system::system_category());
- } else if (attr_ret < 0) {
- ec.assign(-attr_ret, boost::system::system_category());
}
ceph::async::dispatch(std::move(p), ec);
}
Partition partition_info;
uint64_t free_space;
CephContext* cct;
+ inline static std::atomic<uint64_t> index{0};
struct libaio_read_handler {
rgw::Aio* throttle = nullptr;
struct AsyncWriteRequest {
const DoutPrefixProvider* dpp;
std::string key;
+ std::string temp_key;
void *data;
int fd;
unique_aio_cb_ptr cb;