From: Casey Bodley Date: Thu, 15 Feb 2024 03:36:46 +0000 (-0500) Subject: librados/asio: functions use async_initiate X-Git-Tag: v18.2.5~278^2~2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=8fa7a27c2464b1ae3638fb6160119fa5e73b77f1;p=ceph.git librados/asio: functions use async_initiate Signed-off-by: Casey Bodley (cherry picked from commit 675df440fc2f9a64228db74ce1004bc393f09e86) Conflicts: jaeger tracing not backported, no trace_ctx src/librados/librados_asio.h src/rgw/driver/rados/rgw_tools.cc src/rgw/rgw_aio.cc --- diff --git a/src/librados/librados_asio.h b/src/librados/librados_asio.h index bd672d951f73..4d276c59b6be 100644 --- a/src/librados/librados_asio.h +++ b/src/librados/librados_asio.h @@ -110,18 +110,20 @@ auto async_read(ExecutionContext& ctx, IoCtx& io, const std::string& oid, { using Op = detail::AsyncOp; using Signature = typename Op::Signature; - boost::asio::async_completion init(token); - auto p = Op::create(ctx.get_executor(), init.completion_handler); - auto& op = p->user_data; - - int ret = io.aio_read(oid, op.aio_completion.get(), &op.result, len, off); - if (ret < 0) { - auto ec = boost::system::error_code{-ret, librados::detail::err_category()}; - ceph::async::post(std::move(p), ec, bufferlist{}); - } else { - p.release(); // release ownership until completion - } - return init.result.get(); + return boost::asio::async_initiate( + [] (auto handler, auto ex, IoCtx& io, const std::string& oid, + size_t len, uint64_t off) { + auto p = Op::create(ex, std::move(handler)); + auto& op = p->user_data; + + int ret = io.aio_read(oid, op.aio_completion.get(), &op.result, len, off); + if (ret < 0) { + auto ec = boost::system::error_code{-ret, librados::detail::err_category()}; + ceph::async::post(std::move(p), ec, bufferlist{}); + } else { + p.release(); // release ownership until completion + } + }, token, ctx.get_executor(), io, oid, len, off); } /// Calls IoCtx::aio_write() and arranges for the AioCompletion to call a @@ -133,18 +135,20 @@ auto async_write(ExecutionContext& ctx, IoCtx& io, const std::string& oid, { using Op = detail::AsyncOp; using Signature = typename Op::Signature; - boost::asio::async_completion init(token); - auto p = Op::create(ctx.get_executor(), init.completion_handler); - auto& op = p->user_data; - - int ret = io.aio_write(oid, op.aio_completion.get(), bl, len, off); - if (ret < 0) { - auto ec = boost::system::error_code{-ret, librados::detail::err_category()}; - ceph::async::post(std::move(p), ec); - } else { - p.release(); // release ownership until completion - } - return init.result.get(); + return boost::asio::async_initiate( + [] (auto handler, auto ex, IoCtx& io, const std::string& oid, + bufferlist &bl, size_t len, uint64_t off) { + auto p = Op::create(ex, std::move(handler)); + auto& op = p->user_data; + + int ret = io.aio_write(oid, op.aio_completion.get(), bl, len, off); + if (ret < 0) { + auto ec = boost::system::error_code{-ret, librados::detail::err_category()}; + ceph::async::post(std::move(p), ec); + } else { + p.release(); // release ownership until completion + } + }, token, ctx.get_executor(), io, oid, bl, len, off); } /// Calls IoCtx::aio_operate() and arranges for the AioCompletion to call a @@ -156,19 +160,21 @@ auto async_operate(ExecutionContext& ctx, IoCtx& io, const std::string& oid, { using Op = detail::AsyncOp; using Signature = typename Op::Signature; - boost::asio::async_completion init(token); - auto p = Op::create(ctx.get_executor(), init.completion_handler); - auto& op = p->user_data; - - int ret = io.aio_operate(oid, op.aio_completion.get(), read_op, - flags, &op.result); - if (ret < 0) { - auto ec = boost::system::error_code{-ret, librados::detail::err_category()}; - ceph::async::post(std::move(p), ec, bufferlist{}); - } else { - p.release(); // release ownership until completion - } - return init.result.get(); + return boost::asio::async_initiate( + [] (auto handler, auto ex, IoCtx& io, const std::string& oid, + ObjectReadOperation *read_op, int flags) { + auto p = Op::create(ex, std::move(handler)); + auto& op = p->user_data; + + int ret = io.aio_operate(oid, op.aio_completion.get(), read_op, + flags, &op.result); + if (ret < 0) { + auto ec = boost::system::error_code{-ret, librados::detail::err_category()}; + ceph::async::post(std::move(p), ec, bufferlist{}); + } else { + p.release(); // release ownership until completion + } + }, token, ctx.get_executor(), io, oid, read_op, flags); } /// Calls IoCtx::aio_operate() and arranges for the AioCompletion to call a @@ -180,18 +186,20 @@ auto async_operate(ExecutionContext& ctx, IoCtx& io, const std::string& oid, { using Op = detail::AsyncOp; using Signature = typename Op::Signature; - boost::asio::async_completion init(token); - auto p = Op::create(ctx.get_executor(), init.completion_handler); - auto& op = p->user_data; - - int ret = io.aio_operate(oid, op.aio_completion.get(), write_op, flags); - if (ret < 0) { - auto ec = boost::system::error_code{-ret, librados::detail::err_category()}; - ceph::async::post(std::move(p), ec); - } else { - p.release(); // release ownership until completion - } - return init.result.get(); + return boost::asio::async_initiate( + [] (auto handler, auto ex, IoCtx& io, const std::string& oid, + ObjectWriteOperation *write_op, int flags) { + auto p = Op::create(ex, std::move(handler)); + auto& op = p->user_data; + + int ret = io.aio_operate(oid, op.aio_completion.get(), write_op, flags); + if (ret < 0) { + auto ec = boost::system::error_code{-ret, librados::detail::err_category()}; + ceph::async::post(std::move(p), ec); + } else { + p.release(); // release ownership until completion + } + }, token, ctx.get_executor(), io, oid, write_op, flags); } /// Calls IoCtx::aio_notify() and arranges for the AioCompletion to call a @@ -202,19 +210,21 @@ auto async_notify(ExecutionContext& ctx, IoCtx& io, const std::string& oid, { using Op = detail::AsyncOp; using Signature = typename Op::Signature; - boost::asio::async_completion init(token); - auto p = Op::create(ctx.get_executor(), init.completion_handler); - auto& op = p->user_data; - - int ret = io.aio_notify(oid, op.aio_completion.get(), - bl, timeout_ms, &op.result); - if (ret < 0) { - auto ec = boost::system::error_code{-ret, librados::detail::err_category()}; - ceph::async::post(std::move(p), ec, bufferlist{}); - } else { - p.release(); // release ownership until completion - } - return init.result.get(); + return boost::asio::async_initiate( + [] (auto handler, auto ex, IoCtx& io, const std::string& oid, + bufferlist& bl, uint64_t timeout_ms) { + auto p = Op::create(ex, std::move(handler)); + auto& op = p->user_data; + + int ret = io.aio_notify(oid, op.aio_completion.get(), + bl, timeout_ms, &op.result); + if (ret < 0) { + auto ec = boost::system::error_code{-ret, librados::detail::err_category()}; + ceph::async::post(std::move(p), ec, bufferlist{}); + } else { + p.release(); // release ownership until completion + } + }, token, ctx.get_executor(), io, oid, bl, timeout_ms); } } // namespace librados