#include "common/async/completion.h"
#include "common/errno.h"
+#include "common/async/blocked_completion.h"
#include "rgw_ssd_driver.h"
#if defined(__linux__)
#include <features.h>
int SSDDriver::put(const DoutPrefixProvider* dpp, const std::string& key, const bufferlist& bl, uint64_t len, const rgw::sal::Attrs& attrs, optional_yield y)
{
- bufferlist src = bl;
- std::string location = partition_info.location + key;
-
- ldpp_dout(dpp, 20) << __func__ << "(): location=" << location << dendl;
- FILE *cache_file = nullptr;
- int r = 0;
- size_t nbytes = 0;
-
- cache_file = fopen(location.c_str(), "w+");
- if (cache_file == nullptr) {
- ldpp_dout(dpp, 0) << "ERROR: put::fopen file has return error, errno=" << errno << dendl;
- return -errno;
- }
-
- nbytes = fwrite(src.c_str(), 1, len, cache_file);
- if (nbytes != len) {
- ldpp_dout(dpp, 0) << "ERROR: put::io_write: fwrite has returned error: nbytes!=len, nbytes=" << nbytes << ", len=" << len << dendl;
- return -EIO;
+ ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): key=" << key << dendl;
+ boost::system::error_code ec;
+ if (y) {
+ using namespace boost::asio;
+ spawn::yield_context yield = y.get_yield_context();
+ this->put_async(dpp, y.get_io_context(), key, bl, len, attrs, yield[ec]);
+ } else {
+ this->put_async(dpp, y.get_io_context(), key, bl, len, attrs, ceph::async::use_blocked[ec]);
}
-
- r = fclose(cache_file);
- if (r != 0) {
- ldpp_dout(dpp, 0) << "ERROR: put::fclose file has return error, errno=" << errno << dendl;
- return -errno;
+ if (ec) {
+ return ec.value();
}
-
- if (attrs.size() > 0) {
- r = set_attrs(dpp, key, attrs, y);
- if (r < 0) {
- ldpp_dout(dpp, 0) << "ERROR: put::set_attrs: failed to set attrs, r = " << r << dendl;
- return r;
- }
- }
-
- efs::space_info space = efs::space(partition_info.location);
- this->free_space = space.available;
-
return 0;
}
return p;
}
+template <typename Executor1, typename CompletionHandler>
+auto SSDDriver::AsyncWriteRequest::create(const Executor1& ex1, CompletionHandler&& handler)
+{
+ auto p = Completion::create(ex1, std::move(handler));
+ return p;
+}
+
template <typename ExecutionContext, typename CompletionToken>
auto SSDDriver::get_async(const DoutPrefixProvider *dpp, ExecutionContext& ctx, const std::string& key,
off_t read_ofs, off_t read_len, CompletionToken&& token)
auto p = Op::create(ctx.get_executor(), init.completion_handler);
auto& op = p->user_data;
- int ret = op.init(dpp, location, read_ofs, read_len, p.get());
+ int ret = op.prepare_libaio_read_op(dpp, location, read_ofs, read_len, p.get());
if(0 == ret) {
ret = ::aio_read(op.aio_cb.get());
}
return init.result.get();
}
+template <typename ExecutionContext, typename CompletionToken>
+void SSDDriver::put_async(const DoutPrefixProvider *dpp, ExecutionContext& ctx, const std::string& key,
+ const bufferlist& bl, uint64_t len, const rgw::sal::Attrs& attrs, CompletionToken&& token)
+{
+ std::string location = partition_info.location + key;
+ ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): location=" << location << dendl;
+
+ using Op = AsyncWriteRequest;
+ using Signature = typename Op::Signature;
+ boost::asio::async_completion<CompletionToken, Signature> init(token);
+ auto p = Op::create(ctx.get_executor(), init.completion_handler);
+ auto& op = p->user_data;
+
+ int r = 0;
+ bufferlist src = bl;
+ r = op.prepare_libaio_write_op(dpp, src, len, key, partition_info.location);
+ op.cb->aio_sigevent.sigev_notify = SIGEV_THREAD;
+ op.cb->aio_sigevent.sigev_notify_function = SSDDriver::AsyncWriteRequest::libaio_write_cb;
+ op.cb->aio_sigevent.sigev_notify_attributes = nullptr;
+ op.cb->aio_sigevent.sigev_value.sival_ptr = (void*)p.get();
+ op.key = key;
+ op.dpp = dpp;
+ op.priv_data = this;
+ op.attrs = std::move(attrs);
+
+ if (r >= 0) {
+ r = ::aio_write(op.cb.get());
+ } else {
+ ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): ::prepare_libaio_write_op(), r=" << r << dendl;
+ }
+
+ ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): ::aio_write(), r=" << r << dendl;
+ if(r < 0) {
+ auto ec = boost::system::error_code{-r, boost::system::system_category()};
+ ceph::async::post(std::move(p), ec);
+ } else {
+ (void)p.release();
+ }
+ init.result.get();
+}
+
rgw::Aio::OpFunc SSDDriver::ssd_cache_read_op(const DoutPrefixProvider *dpp, optional_yield y, rgw::cache::CacheDriver* cache_driver,
off_t read_ofs, off_t read_len, const std::string& key) {
return [this, dpp, y, read_ofs, read_len, key] (Aio* aio, AioResult& r) mutable {
auto ex = get_associated_executor(init.completion_handler);
ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): key=" << key << dendl;
- this->get_async(dpp, y.get_io_context(), key, read_ofs, read_len, bind_executor(ex, SSDDriver::libaio_handler{aio, r}));
+ this->get_async(dpp, y.get_io_context(), key, read_ofs, read_len, bind_executor(ex, SSDDriver::libaio_read_handler{aio, r}));
+ };
+}
+
+rgw::Aio::OpFunc SSDDriver::ssd_cache_write_op(const DoutPrefixProvider *dpp, optional_yield y, rgw::cache::CacheDriver* cache_driver,
+ const bufferlist& bl, uint64_t len, const rgw::sal::Attrs& attrs, const std::string& key) {
+ return [this, dpp, y, bl, len, attrs, key] (Aio* aio, AioResult& r) mutable {
+ ceph_assert(y);
+ ldpp_dout(dpp, 20) << "SSDCache: cache_write_op(): Write to Cache, oid=" << r.obj.oid << dendl;
+
+ using namespace boost::asio;
+ spawn::yield_context yield = y.get_yield_context();
+ async_completion<spawn::yield_context, void()> init(yield);
+ auto ex = get_associated_executor(init.completion_handler);
+
+ ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): key=" << key << dendl;
+ this->put_async(dpp, y.get_io_context(), key, bl, len, attrs, bind_executor(ex, SSDDriver::libaio_write_handler{aio, r}));
};
}
return aio->get(r_obj, ssd_cache_read_op(dpp, y, this, ofs, len, key), cost, id);
}
-void SSDDriver::libaio_write_completion_cb(AsyncWriteRequest* c)
-{
- efs::space_info space = efs::space(partition_info.location);
- this->free_space = space.available;
-}
-
-int SSDDriver::put_async(const DoutPrefixProvider* dpp, const std::string& key, const bufferlist& bl, uint64_t len, const rgw::sal::Attrs& attrs)
+rgw::AioResultList SSDDriver::put_async(const DoutPrefixProvider* dpp, optional_yield y, rgw::Aio* aio, const std::string& key, const bufferlist& bl, uint64_t len, const rgw::sal::Attrs& attrs, uint64_t cost, uint64_t id)
{
- ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): Write To Cache, oid=" << key << ", len=" << len << dendl;
- bufferlist src = bl;
- struct AsyncWriteRequest* wr = new struct AsyncWriteRequest(dpp);
- int r = 0;
- if ((r = wr->prepare_libaio_write_op(dpp, src, len, key, partition_info.location)) < 0) {
- ldpp_dout(dpp, 0) << "ERROR: SSDCache: " << __func__ << "() prepare libaio write op r=" << r << dendl;
- return r;
- }
- wr->cb->aio_sigevent.sigev_notify = SIGEV_THREAD;
- wr->cb->aio_sigevent.sigev_notify_function = SSDDriver::AsyncWriteRequest::libaio_write_cb;
- wr->cb->aio_sigevent.sigev_notify_attributes = nullptr;
- wr->cb->aio_sigevent.sigev_value.sival_ptr = (void*)wr;
- wr->key = key;
- wr->priv_data = this;
-
- if ((r = ::aio_write(wr->cb)) != 0) {
- ldpp_dout(dpp, 0) << "ERROR: SSDCache: " << __func__ << "() aio_write r=" << r << dendl;
- delete wr;
- return r;
- }
- return 0;
+ rgw_raw_obj r_obj;
+ r_obj.oid = key;
+ return aio->get(r_obj, ssd_cache_write_op(dpp, y, this, bl, len, attrs, key), cost, id);
}
int SSDDriver::delete_data(const DoutPrefixProvider* dpp, const::std::string& key, optional_yield y)
{
std::string location = cache_location + key;
int r = 0;
-
ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): Write To Cache, location=" << location << dendl;
- cb = new struct aiocb;
+ cb.reset(new struct aiocb);
+ memset(cb.get(), 0, sizeof(struct aiocb));
mode_t mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
- memset(cb, 0, sizeof(struct aiocb));
- r = fd = ::open(location.c_str(), O_WRONLY | O_CREAT | O_TRUNC, mode);
+ r = fd = TEMP_FAILURE_RETRY(::open(location.c_str(), O_WRONLY | O_CREAT | O_TRUNC, mode));
if (fd < 0) {
ldpp_dout(dpp, 0) << "ERROR: AsyncWriteRequest::prepare_libaio_write_op: open file failed, errno=" << errno << ", location='" << location.c_str() << "'" << dendl;
return r;
return r;
}
-void SSDDriver::AsyncWriteRequest::libaio_write_cb(sigval sigval)
-{
- SSDDriver::AsyncWriteRequest* c = static_cast<SSDDriver::AsyncWriteRequest*>(sigval.sival_ptr);
- c->priv_data->libaio_write_completion_cb(c);
+void SSDDriver::AsyncWriteRequest::libaio_write_cb(sigval sigval) {
+ auto p = std::unique_ptr<Completion>{static_cast<Completion*>(sigval.sival_ptr)};
+ auto op = std::move(p->user_data);
+ ldpp_dout(op.dpp, 20) << "INFO: AsyncWriteRequest::libaio_write_cb: key: " << op.key << dendl;
+ int attr_ret = 0;
+ if (op.attrs.size() > 0) {
+ //TODO - fix yield_context
+ optional_yield y{null_yield};
+ attr_ret = op.priv_data->set_attrs(op.dpp, op.key, op.attrs, y);
+ if (attr_ret < 0) {
+ ldpp_dout(op.dpp, 0) << "ERROR: put::set_attrs: failed to set attrs, ret = " << attr_ret << dendl;
+ }
+ }
+
+ Partition partition_info = op.priv_data->get_current_partition_info(op.dpp);
+ efs::space_info space = efs::space(partition_info.location);
+ op.priv_data->set_free_space(op.dpp, space.available);
+
+ const int ret = -aio_error(op.cb.get());
+ boost::system::error_code ec;
+ if (ret < 0) {
+ ec.assign(-ret, boost::system::system_category());
+ } else if (attr_ret < 0) {
+ ec.assign(-attr_ret, boost::system::system_category());
+ }
+ ceph::async::dispatch(std::move(p), ec);
}
-int SSDDriver::AsyncReadOp::init(const DoutPrefixProvider *dpp, const std::string& file_path, off_t read_ofs, off_t read_len, void* arg)
+int SSDDriver::AsyncReadOp::prepare_libaio_read_op(const DoutPrefixProvider *dpp, const std::string& file_path, off_t read_ofs, off_t read_len, void* arg)
{
ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): file_path=" << file_path << dendl;
aio_cb.reset(new struct aiocb);
virtual int get(const DoutPrefixProvider* dpp, const std::string& key, off_t offset, uint64_t len, bufferlist& bl, rgw::sal::Attrs& attrs, optional_yield y) override;
virtual int del(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y) override { return -1; } // TODO: implement
virtual rgw::AioResultList get_async (const DoutPrefixProvider* dpp, optional_yield y, rgw::Aio* aio, const std::string& key, off_t ofs, uint64_t len, uint64_t cost, uint64_t id) override;
- virtual int put_async(const DoutPrefixProvider* dpp, const std::string& key, const bufferlist& bl, uint64_t len, const rgw::sal::Attrs& attrs) override;
+ virtual rgw::AioResultList put_async(const DoutPrefixProvider* dpp, optional_yield y, rgw::Aio* aio, const std::string& key, const bufferlist& bl, uint64_t len, const rgw::sal::Attrs& attrs, uint64_t cost, uint64_t id) override;
virtual int append_data(const DoutPrefixProvider* dpp, const::std::string& key, const bufferlist& bl_data, optional_yield y) override;
virtual int delete_data(const DoutPrefixProvider* dpp, const::std::string& key, optional_yield y) override;
virtual int get_attrs(const DoutPrefixProvider* dpp, const std::string& key, rgw::sal::Attrs& attrs, optional_yield y) override;
/* Partition */
virtual Partition get_current_partition_info(const DoutPrefixProvider* dpp) override { return partition_info; }
virtual uint64_t get_free_space(const DoutPrefixProvider* dpp) override { return free_space; }
+ void set_free_space(const DoutPrefixProvider* dpp, uint64_t free_space) { this->free_space = free_space; }
private:
Partition partition_info;
uint64_t free_space;
CephContext* cct;
- struct libaio_handler {
+ struct libaio_read_handler {
rgw::Aio* throttle = nullptr;
rgw::AioResult& r;
// read callback
}
};
+ struct libaio_write_handler {
+ rgw::Aio* throttle = nullptr;
+ rgw::AioResult& r;
+ // write callback
+ void operator()(boost::system::error_code ec) const {
+ r.result = -ec.value();
+ throttle->put(r);
+ }
+ };
+
// unique_ptr with custom deleter for struct aiocb
struct libaio_aiocb_deleter {
void operator()(struct aiocb* c) {
if(c->aio_fildes > 0) {
- if( ::close(c->aio_fildes) != 0) {
- }
+ TEMP_FAILURE_RETRY(::close(c->aio_fildes));
}
+ c->aio_buf = nullptr;
delete c;
}
};
template <typename ExecutionContext, typename CompletionToken>
auto get_async(const DoutPrefixProvider *dpp, ExecutionContext& ctx, const std::string& key,
off_t read_ofs, off_t read_len, CompletionToken&& token);
-
+
+ template <typename ExecutionContext, typename CompletionToken>
+ void put_async(const DoutPrefixProvider *dpp, ExecutionContext& ctx, const std::string& key,
+ const bufferlist& bl, uint64_t len, const rgw::sal::Attrs& attrs, CompletionToken&& token);
+
rgw::Aio::OpFunc ssd_cache_read_op(const DoutPrefixProvider *dpp, optional_yield y, rgw::cache::CacheDriver* cache_driver,
off_t read_ofs, off_t read_len, const std::string& key);
+ rgw::Aio::OpFunc ssd_cache_write_op(const DoutPrefixProvider *dpp, optional_yield y, rgw::cache::CacheDriver* cache_driver,
+ const bufferlist& bl, uint64_t len, const rgw::sal::Attrs& attrs, const std::string& key);
+
using unique_aio_cb_ptr = std::unique_ptr<struct aiocb, libaio_aiocb_deleter>;
struct AsyncReadOp {
using Signature = void(boost::system::error_code, bufferlist);
using Completion = ceph::async::Completion<Signature, AsyncReadOp>;
- int init(const DoutPrefixProvider *dpp, const std::string& file_path, off_t read_ofs, off_t read_len, void* arg);
- static void libaio_cb_aio_dispatch(sigval sigval);
+ int prepare_libaio_read_op(const DoutPrefixProvider *dpp, const std::string& file_path, off_t read_ofs, off_t read_len, void* arg);
+ static void libaio_cb_aio_dispatch(sigval sigval);
template <typename Executor1, typename CompletionHandler>
static auto create(const Executor1& ex1, CompletionHandler&& handler);
std::string key;
void *data;
int fd;
- struct aiocb *cb;
+ unique_aio_cb_ptr cb;
SSDDriver *priv_data;
+ rgw::sal::Attrs attrs;
+
+ using Signature = void(boost::system::error_code);
+ using Completion = ceph::async::Completion<Signature, AsyncWriteRequest>;
- AsyncWriteRequest(const DoutPrefixProvider* dpp) : dpp(dpp) {}
int prepare_libaio_write_op(const DoutPrefixProvider *dpp, bufferlist& bl, unsigned int len, std::string key, std::string cache_location);
static void libaio_write_cb(sigval sigval);
- ~AsyncWriteRequest() {
- ::close(fd);
- cb->aio_buf = nullptr;
- delete(cb);
- }
+ template <typename Executor1, typename CompletionHandler>
+ static auto create(const Executor1& ex1, CompletionHandler&& handler);
};
-
- void libaio_write_completion_cb(AsyncWriteRequest* c);
};
} } // namespace rgw::cache