]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
librados/asio: extend lifetime of AioCompletion
authorCasey Bodley <cbodley@redhat.com>
Tue, 12 May 2026 18:55:31 +0000 (14:55 -0400)
committerCasey Bodley <cbodley@redhat.com>
Mon, 18 May 2026 17:31:23 +0000 (13:31 -0400)
pass the unique_aio_completion_ptr into a wrapped completion handler to
extend its lifetime past the end of the AsyncOp::aio_dispatch() callback

Signed-off-by: Casey Bodley <cbodley@redhat.com>
src/librados/librados_asio.h

index 3402e4a7bde2f32b4edfbe3ac1260b2115a47621..077ab79b141bf6e04ba69dda51496fc48e1db72b 100644 (file)
@@ -16,6 +16,7 @@
 #define LIBRADOS_ASIO_H
 
 #include <boost/asio/associated_cancellation_slot.hpp>
+#include <boost/asio/associator.hpp>
 #include <boost/asio/cancellation_type.hpp>
 #include <boost/asio/execution/executor.hpp>
 
@@ -60,29 +61,75 @@ using unique_aio_completion_ptr =
 template <typename Result>
 struct Invoker {
   using Signature = void(boost::system::error_code, version_t, Result);
+  using InnerSignature = void(unique_aio_completion_ptr, boost::system::error_code, version_t, Result);
   Result result;
   template <typename Completion>
-  void dispatch(Completion&& completion, boost::system::error_code ec, version_t ver) {
-    ceph::async::dispatch(std::move(completion), ec, ver, std::move(result));
+  void dispatch(Completion&& completion, unique_aio_completion_ptr aio_completion,
+                boost::system::error_code ec, version_t ver) {
+    ceph::async::dispatch(std::move(completion),
+                          std::move(aio_completion),
+                          ec, ver, std::move(result));
   }
 };
 // specialization for Result=void
 template <>
 struct Invoker<void> {
   using Signature = void(boost::system::error_code, version_t);
+  using InnerSignature = void(unique_aio_completion_ptr, boost::system::error_code, version_t);
   template <typename Completion>
-  void dispatch(Completion&& completion, boost::system::error_code ec, version_t ver) {
-    ceph::async::dispatch(std::move(completion), ec, ver);
+  void dispatch(Completion&& completion, unique_aio_completion_ptr aio_completion,
+                boost::system::error_code ec, version_t ver) {
+    ceph::async::dispatch(std::move(completion),
+                          std::move(aio_completion),
+                          ec, ver);
   }
 };
 
+template <typename Handler>
+struct AsyncHandler {
+  Handler handler;
+
+  AsyncHandler(Handler&& h) : handler(std::move(h)) {}
+
+  template <typename ...Args>
+  void operator()(unique_aio_completion_ptr aio_completion, Args&& ...args) {
+    aio_completion.reset();
+    std::move(handler)(std::forward<Args>(args)...);
+  }
+};
+
+} // namespace detail
+} // namespace librados
+
+namespace boost::asio {
+// forward associations from wrapped handler
+template <template<typename, typename> class Associator,
+          typename Handler, typename DefaultCandidate>
+struct associator<Associator, ::librados::detail::AsyncHandler<Handler>, DefaultCandidate>
+    : Associator<Handler, DefaultCandidate>
+{
+  static auto get(const ::librados::detail::AsyncHandler<Handler>& h) noexcept {
+    return Associator<Handler, DefaultCandidate>::get(h.handler);
+  }
+
+  static auto get(const ::librados::detail::AsyncHandler<Handler>& h,
+                  const DefaultCandidate& c) noexcept {
+    return Associator<Handler, DefaultCandidate>::get(h.handler, c);
+  }
+};
+} // namespace boost::asio
+
+namespace librados {
+namespace detail {
+
 template <typename Result>
 struct AsyncOp : Invoker<Result> {
   unique_aio_completion_ptr aio_completion;
   boost::asio::cancellation_slot slot;
 
   using Signature = typename Invoker<Result>::Signature;
-  using Completion = ceph::async::Completion<Signature, AsyncOp<Result>>;
+  using InnerSignature = typename Invoker<Result>::InnerSignature;
+  using Completion = ceph::async::Completion<InnerSignature, AsyncOp<Result>>;
 
   static void aio_dispatch(completion_t cb, void *arg) {
     // reclaim ownership of the completion
@@ -98,7 +145,7 @@ struct AsyncOp : Invoker<Result> {
     if (ret < 0) {
       ec.assign(-ret, librados::detail::err_category());
     }
-    op.dispatch(std::move(p), ec, ver);
+    op.dispatch(std::move(p), std::move(op.aio_completion), ec, ver);
   }
 
   struct op_cancellation {
@@ -133,7 +180,7 @@ struct AsyncOp : Invoker<Result> {
       cancel_handler = &slot.template emplace<op_cancellation>();
     }
 
-    auto p = Completion::create(ex1, std::move(handler));
+    auto p = Completion::create(ex1, AsyncHandler{std::move(handler)});
     p->user_data.aio_completion.reset(
         Rados::aio_create_completion(p.get(), aio_dispatch));
     if (cancel_handler) {
@@ -170,7 +217,8 @@ auto async_read(IoExecutor ex, IoCtx& io, const std::string& oid,
         int ret = io.aio_read(oid, op.aio_completion.get(), &op.result, len, off);
         if (ret < 0) {
           auto ec = boost::system::error_code{-ret, librados::detail::err_category()};
-          ceph::async::post(std::move(p), ec, 0, bufferlist{});
+          ceph::async::post(std::move(p), std::move(op.aio_completion),
+                            ec, 0, bufferlist{});
         } else {
           p.release(); // release ownership until completion
         }
@@ -200,7 +248,7 @@ auto async_write(IoExecutor ex, IoCtx& io, const std::string& oid,
         int ret = io.aio_write(oid, op.aio_completion.get(), bl, len, off);
         if (ret < 0) {
           auto ec = boost::system::error_code{-ret, librados::detail::err_category()};
-          ceph::async::post(std::move(p), ec, 0);
+          ceph::async::post(std::move(p), std::move(op.aio_completion), ec, 0);
         } else {
           p.release(); // release ownership until completion
         }
@@ -231,7 +279,8 @@ auto async_operate(IoExecutor ex, IoCtx& io, const std::string& oid,
                                  flags, &op.result);
         if (ret < 0) {
           auto ec = boost::system::error_code{-ret, librados::detail::err_category()};
-          ceph::async::post(std::move(p), ec, 0, bufferlist{});
+          ceph::async::post(std::move(p), std::move(op.aio_completion),
+                            ec, 0, bufferlist{});
         } else {
           p.release(); // release ownership until completion
         }
@@ -262,7 +311,7 @@ auto async_operate(IoExecutor ex, IoCtx& io, const std::string& oid,
         int ret = io.aio_operate(oid, op.aio_completion.get(), &write_op, flags, trace_ctx);
         if (ret < 0) {
           auto ec = boost::system::error_code{-ret, librados::detail::err_category()};
-          ceph::async::post(std::move(p), ec, 0);
+          ceph::async::post(std::move(p), std::move(op.aio_completion), ec, 0);
         } else {
           p.release(); // release ownership until completion
         }
@@ -293,7 +342,7 @@ auto async_watch(IoExecutor ex, IoCtx& io, const std::string& oid,
                                 handle, ctx, timeout_ms);
         if (ret < 0) {
           auto ec = boost::system::error_code{-ret, librados::detail::err_category()};
-          ceph::async::post(std::move(p), ec, 0);
+          ceph::async::post(std::move(p), std::move(op.aio_completion), ec, 0);
         } else {
           p.release(); // release ownership until completion
         }
@@ -321,7 +370,7 @@ auto async_unwatch(IoExecutor ex, IoCtx& io, uint64_t handle,
         int ret = io.aio_unwatch(handle, op.aio_completion.get());
         if (ret < 0) {
           auto ec = boost::system::error_code{-ret, librados::detail::err_category()};
-          ceph::async::post(std::move(p), ec, 0);
+          ceph::async::post(std::move(p), std::move(op.aio_completion), ec, 0);
         } else {
           p.release(); // release ownership until completion
         }
@@ -352,7 +401,8 @@ auto async_notify(IoExecutor ex, IoCtx& io, const std::string& oid,
                                 bl, timeout_ms, &op.result);
         if (ret < 0) {
           auto ec = boost::system::error_code{-ret, librados::detail::err_category()};
-          ceph::async::post(std::move(p), ec, 0, bufferlist{});
+          ceph::async::post(std::move(p), std::move(op.aio_completion),
+                            ec, 0, bufferlist{});
         } else {
           p.release(); // release ownership until completion
         }