#define LIBRADOS_ASIO_H
#include <boost/asio/associated_cancellation_slot.hpp>
+#include <boost/asio/associator.hpp>
#include <boost/asio/cancellation_type.hpp>
#include <boost/asio/execution/executor.hpp>
template <typename Result>
struct Invoker {
using Signature = void(boost::system::error_code, version_t, Result);
+ using InnerSignature = void(unique_aio_completion_ptr, boost::system::error_code, version_t, Result);
Result result;
template <typename Completion>
- void dispatch(Completion&& completion, boost::system::error_code ec, version_t ver) {
- ceph::async::dispatch(std::move(completion), ec, ver, std::move(result));
+ void dispatch(Completion&& completion, unique_aio_completion_ptr aio_completion,
+ boost::system::error_code ec, version_t ver) {
+ ceph::async::dispatch(std::move(completion),
+ std::move(aio_completion),
+ ec, ver, std::move(result));
}
};
// specialization for Result=void
template <>
struct Invoker<void> {
using Signature = void(boost::system::error_code, version_t);
+ using InnerSignature = void(unique_aio_completion_ptr, boost::system::error_code, version_t);
template <typename Completion>
- void dispatch(Completion&& completion, boost::system::error_code ec, version_t ver) {
- ceph::async::dispatch(std::move(completion), ec, ver);
+ void dispatch(Completion&& completion, unique_aio_completion_ptr aio_completion,
+ boost::system::error_code ec, version_t ver) {
+ ceph::async::dispatch(std::move(completion),
+ std::move(aio_completion),
+ ec, ver);
}
};
+template <typename Handler>
+struct AsyncHandler {
+ Handler handler;
+
+ AsyncHandler(Handler&& h) : handler(std::move(h)) {}
+
+ template <typename ...Args>
+ void operator()(unique_aio_completion_ptr aio_completion, Args&& ...args) {
+ aio_completion.reset();
+ std::move(handler)(std::forward<Args>(args)...);
+ }
+};
+
+} // namespace detail
+} // namespace librados
+
+namespace boost::asio {
+// forward associations from wrapped handler
+template <template<typename, typename> class Associator,
+ typename Handler, typename DefaultCandidate>
+struct associator<Associator, ::librados::detail::AsyncHandler<Handler>, DefaultCandidate>
+ : Associator<Handler, DefaultCandidate>
+{
+ static auto get(const ::librados::detail::AsyncHandler<Handler>& h) noexcept {
+ return Associator<Handler, DefaultCandidate>::get(h.handler);
+ }
+
+ static auto get(const ::librados::detail::AsyncHandler<Handler>& h,
+ const DefaultCandidate& c) noexcept {
+ return Associator<Handler, DefaultCandidate>::get(h.handler, c);
+ }
+};
+} // namespace boost::asio
+
+namespace librados {
+namespace detail {
+
template <typename Result>
struct AsyncOp : Invoker<Result> {
unique_aio_completion_ptr aio_completion;
boost::asio::cancellation_slot slot;
using Signature = typename Invoker<Result>::Signature;
- using Completion = ceph::async::Completion<Signature, AsyncOp<Result>>;
+ using InnerSignature = typename Invoker<Result>::InnerSignature;
+ using Completion = ceph::async::Completion<InnerSignature, AsyncOp<Result>>;
static void aio_dispatch(completion_t cb, void *arg) {
// reclaim ownership of the completion
if (ret < 0) {
ec.assign(-ret, librados::detail::err_category());
}
- op.dispatch(std::move(p), ec, ver);
+ op.dispatch(std::move(p), std::move(op.aio_completion), ec, ver);
}
struct op_cancellation {
cancel_handler = &slot.template emplace<op_cancellation>();
}
- auto p = Completion::create(ex1, std::move(handler));
+ auto p = Completion::create(ex1, AsyncHandler{std::move(handler)});
p->user_data.aio_completion.reset(
Rados::aio_create_completion(p.get(), aio_dispatch));
if (cancel_handler) {
int ret = io.aio_read(oid, op.aio_completion.get(), &op.result, len, off);
if (ret < 0) {
auto ec = boost::system::error_code{-ret, librados::detail::err_category()};
- ceph::async::post(std::move(p), ec, 0, bufferlist{});
+ ceph::async::post(std::move(p), std::move(op.aio_completion),
+ ec, 0, bufferlist{});
} else {
p.release(); // release ownership until completion
}
int ret = io.aio_write(oid, op.aio_completion.get(), bl, len, off);
if (ret < 0) {
auto ec = boost::system::error_code{-ret, librados::detail::err_category()};
- ceph::async::post(std::move(p), ec, 0);
+ ceph::async::post(std::move(p), std::move(op.aio_completion), ec, 0);
} else {
p.release(); // release ownership until completion
}
flags, &op.result);
if (ret < 0) {
auto ec = boost::system::error_code{-ret, librados::detail::err_category()};
- ceph::async::post(std::move(p), ec, 0, bufferlist{});
+ ceph::async::post(std::move(p), std::move(op.aio_completion),
+ ec, 0, bufferlist{});
} else {
p.release(); // release ownership until completion
}
int ret = io.aio_operate(oid, op.aio_completion.get(), &write_op, flags, trace_ctx);
if (ret < 0) {
auto ec = boost::system::error_code{-ret, librados::detail::err_category()};
- ceph::async::post(std::move(p), ec, 0);
+ ceph::async::post(std::move(p), std::move(op.aio_completion), ec, 0);
} else {
p.release(); // release ownership until completion
}
handle, ctx, timeout_ms);
if (ret < 0) {
auto ec = boost::system::error_code{-ret, librados::detail::err_category()};
- ceph::async::post(std::move(p), ec, 0);
+ ceph::async::post(std::move(p), std::move(op.aio_completion), ec, 0);
} else {
p.release(); // release ownership until completion
}
int ret = io.aio_unwatch(handle, op.aio_completion.get());
if (ret < 0) {
auto ec = boost::system::error_code{-ret, librados::detail::err_category()};
- ceph::async::post(std::move(p), ec, 0);
+ ceph::async::post(std::move(p), std::move(op.aio_completion), ec, 0);
} else {
p.release(); // release ownership until completion
}
bl, timeout_ms, &op.result);
if (ret < 0) {
auto ec = boost::system::error_code{-ret, librados::detail::err_category()};
- ceph::async::post(std::move(p), ec, 0, bufferlist{});
+ ceph::async::post(std::move(p), std::move(op.aio_completion),
+ ec, 0, bufferlist{});
} else {
p.release(); // release ownership until completion
}