From 27bbb966dd688bc7b448a5e58689fcc92da264c3 Mon Sep 17 00:00:00 2001 From: Casey Bodley Date: Tue, 12 May 2026 14:55:31 -0400 Subject: [PATCH] librados/asio: extend lifetime of AioCompletion pass the unique_aio_completion_ptr into a wrapped completion handler to extend its lifetime past the end of the AsyncOp::aio_dispatch() callback Signed-off-by: Casey Bodley --- src/librados/librados_asio.h | 78 +++++++++++++++++++++++++++++------- 1 file changed, 64 insertions(+), 14 deletions(-) diff --git a/src/librados/librados_asio.h b/src/librados/librados_asio.h index 3402e4a7bde2..077ab79b141b 100644 --- a/src/librados/librados_asio.h +++ b/src/librados/librados_asio.h @@ -16,6 +16,7 @@ #define LIBRADOS_ASIO_H #include +#include #include #include @@ -60,29 +61,75 @@ using unique_aio_completion_ptr = template struct Invoker { using Signature = void(boost::system::error_code, version_t, Result); + using InnerSignature = void(unique_aio_completion_ptr, boost::system::error_code, version_t, Result); Result result; template - void dispatch(Completion&& completion, boost::system::error_code ec, version_t ver) { - ceph::async::dispatch(std::move(completion), ec, ver, std::move(result)); + void dispatch(Completion&& completion, unique_aio_completion_ptr aio_completion, + boost::system::error_code ec, version_t ver) { + ceph::async::dispatch(std::move(completion), + std::move(aio_completion), + ec, ver, std::move(result)); } }; // specialization for Result=void template <> struct Invoker { using Signature = void(boost::system::error_code, version_t); + using InnerSignature = void(unique_aio_completion_ptr, boost::system::error_code, version_t); template - void dispatch(Completion&& completion, boost::system::error_code ec, version_t ver) { - ceph::async::dispatch(std::move(completion), ec, ver); + void dispatch(Completion&& completion, unique_aio_completion_ptr aio_completion, + boost::system::error_code ec, version_t ver) { + ceph::async::dispatch(std::move(completion), + std::move(aio_completion), + ec, ver); } }; +template +struct AsyncHandler { + Handler handler; + + AsyncHandler(Handler&& h) : handler(std::move(h)) {} + + template + void operator()(unique_aio_completion_ptr aio_completion, Args&& ...args) { + aio_completion.reset(); + std::move(handler)(std::forward(args)...); + } +}; + +} // namespace detail +} // namespace librados + +namespace boost::asio { +// forward associations from wrapped handler +template class Associator, + typename Handler, typename DefaultCandidate> +struct associator, DefaultCandidate> + : Associator +{ + static auto get(const ::librados::detail::AsyncHandler& h) noexcept { + return Associator::get(h.handler); + } + + static auto get(const ::librados::detail::AsyncHandler& h, + const DefaultCandidate& c) noexcept { + return Associator::get(h.handler, c); + } +}; +} // namespace boost::asio + +namespace librados { +namespace detail { + template struct AsyncOp : Invoker { unique_aio_completion_ptr aio_completion; boost::asio::cancellation_slot slot; using Signature = typename Invoker::Signature; - using Completion = ceph::async::Completion>; + using InnerSignature = typename Invoker::InnerSignature; + using Completion = ceph::async::Completion>; static void aio_dispatch(completion_t cb, void *arg) { // reclaim ownership of the completion @@ -98,7 +145,7 @@ struct AsyncOp : Invoker { if (ret < 0) { ec.assign(-ret, librados::detail::err_category()); } - op.dispatch(std::move(p), ec, ver); + op.dispatch(std::move(p), std::move(op.aio_completion), ec, ver); } struct op_cancellation { @@ -133,7 +180,7 @@ struct AsyncOp : Invoker { cancel_handler = &slot.template emplace(); } - auto p = Completion::create(ex1, std::move(handler)); + auto p = Completion::create(ex1, AsyncHandler{std::move(handler)}); p->user_data.aio_completion.reset( Rados::aio_create_completion(p.get(), aio_dispatch)); if (cancel_handler) { @@ -170,7 +217,8 @@ auto async_read(IoExecutor ex, IoCtx& io, const std::string& oid, int ret = io.aio_read(oid, op.aio_completion.get(), &op.result, len, off); if (ret < 0) { auto ec = boost::system::error_code{-ret, librados::detail::err_category()}; - ceph::async::post(std::move(p), ec, 0, bufferlist{}); + ceph::async::post(std::move(p), std::move(op.aio_completion), + ec, 0, bufferlist{}); } else { p.release(); // release ownership until completion } @@ -200,7 +248,7 @@ auto async_write(IoExecutor ex, IoCtx& io, const std::string& oid, int ret = io.aio_write(oid, op.aio_completion.get(), bl, len, off); if (ret < 0) { auto ec = boost::system::error_code{-ret, librados::detail::err_category()}; - ceph::async::post(std::move(p), ec, 0); + ceph::async::post(std::move(p), std::move(op.aio_completion), ec, 0); } else { p.release(); // release ownership until completion } @@ -231,7 +279,8 @@ auto async_operate(IoExecutor ex, IoCtx& io, const std::string& oid, flags, &op.result); if (ret < 0) { auto ec = boost::system::error_code{-ret, librados::detail::err_category()}; - ceph::async::post(std::move(p), ec, 0, bufferlist{}); + ceph::async::post(std::move(p), std::move(op.aio_completion), + ec, 0, bufferlist{}); } else { p.release(); // release ownership until completion } @@ -262,7 +311,7 @@ auto async_operate(IoExecutor ex, IoCtx& io, const std::string& oid, int ret = io.aio_operate(oid, op.aio_completion.get(), &write_op, flags, trace_ctx); if (ret < 0) { auto ec = boost::system::error_code{-ret, librados::detail::err_category()}; - ceph::async::post(std::move(p), ec, 0); + ceph::async::post(std::move(p), std::move(op.aio_completion), ec, 0); } else { p.release(); // release ownership until completion } @@ -293,7 +342,7 @@ auto async_watch(IoExecutor ex, IoCtx& io, const std::string& oid, handle, ctx, timeout_ms); if (ret < 0) { auto ec = boost::system::error_code{-ret, librados::detail::err_category()}; - ceph::async::post(std::move(p), ec, 0); + ceph::async::post(std::move(p), std::move(op.aio_completion), ec, 0); } else { p.release(); // release ownership until completion } @@ -321,7 +370,7 @@ auto async_unwatch(IoExecutor ex, IoCtx& io, uint64_t handle, int ret = io.aio_unwatch(handle, op.aio_completion.get()); if (ret < 0) { auto ec = boost::system::error_code{-ret, librados::detail::err_category()}; - ceph::async::post(std::move(p), ec, 0); + ceph::async::post(std::move(p), std::move(op.aio_completion), ec, 0); } else { p.release(); // release ownership until completion } @@ -352,7 +401,8 @@ auto async_notify(IoExecutor ex, IoCtx& io, const std::string& oid, bl, timeout_ms, &op.result); if (ret < 0) { auto ec = boost::system::error_code{-ret, librados::detail::err_category()}; - ceph::async::post(std::move(p), ec, 0, bufferlist{}); + ceph::async::post(std::move(p), std::move(op.aio_completion), + ec, 0, bufferlist{}); } else { p.release(); // release ownership until completion } -- 2.47.3