From: Casey Bodley Date: Thu, 15 Feb 2024 03:36:46 +0000 (-0500) Subject: librados/asio: functions use async_initiate X-Git-Tag: v19.2.1~172^2~8 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=a1d8c2ec3b63bf685fc24e5fd87d11bbf7a3cafb;p=ceph.git librados/asio: functions use async_initiate Signed-off-by: Casey Bodley (cherry picked from commit 675df440fc2f9a64228db74ce1004bc393f09e86) --- diff --git a/src/librados/librados_asio.h b/src/librados/librados_asio.h index 2eae1c268f6cc..19a8c8fc01de4 100644 --- a/src/librados/librados_asio.h +++ b/src/librados/librados_asio.h @@ -110,18 +110,20 @@ auto async_read(ExecutionContext& ctx, IoCtx& io, const std::string& oid, { using Op = detail::AsyncOp; using Signature = typename Op::Signature; - boost::asio::async_completion init(token); - auto p = Op::create(ctx.get_executor(), init.completion_handler); - auto& op = p->user_data; - - int ret = io.aio_read(oid, op.aio_completion.get(), &op.result, len, off); - if (ret < 0) { - auto ec = boost::system::error_code{-ret, librados::detail::err_category()}; - ceph::async::post(std::move(p), ec, bufferlist{}); - } else { - p.release(); // release ownership until completion - } - return init.result.get(); + return boost::asio::async_initiate( + [] (auto handler, auto ex, IoCtx& io, const std::string& oid, + size_t len, uint64_t off) { + auto p = Op::create(ex, std::move(handler)); + auto& op = p->user_data; + + int ret = io.aio_read(oid, op.aio_completion.get(), &op.result, len, off); + if (ret < 0) { + auto ec = boost::system::error_code{-ret, librados::detail::err_category()}; + ceph::async::post(std::move(p), ec, bufferlist{}); + } else { + p.release(); // release ownership until completion + } + }, token, ctx.get_executor(), io, oid, len, off); } /// Calls IoCtx::aio_write() and arranges for the AioCompletion to call a @@ -133,18 +135,20 @@ auto async_write(ExecutionContext& ctx, IoCtx& io, const std::string& oid, { using Op = detail::AsyncOp; using Signature = typename Op::Signature; - boost::asio::async_completion init(token); - auto p = Op::create(ctx.get_executor(), init.completion_handler); - auto& op = p->user_data; - - int ret = io.aio_write(oid, op.aio_completion.get(), bl, len, off); - if (ret < 0) { - auto ec = boost::system::error_code{-ret, librados::detail::err_category()}; - ceph::async::post(std::move(p), ec); - } else { - p.release(); // release ownership until completion - } - return init.result.get(); + return boost::asio::async_initiate( + [] (auto handler, auto ex, IoCtx& io, const std::string& oid, + bufferlist &bl, size_t len, uint64_t off) { + auto p = Op::create(ex, std::move(handler)); + auto& op = p->user_data; + + int ret = io.aio_write(oid, op.aio_completion.get(), bl, len, off); + if (ret < 0) { + auto ec = boost::system::error_code{-ret, librados::detail::err_category()}; + ceph::async::post(std::move(p), ec); + } else { + p.release(); // release ownership until completion + } + }, token, ctx.get_executor(), io, oid, bl, len, off); } /// Calls IoCtx::aio_operate() and arranges for the AioCompletion to call a @@ -152,23 +156,25 @@ auto async_write(ExecutionContext& ctx, IoCtx& io, const std::string& oid, template auto async_operate(ExecutionContext& ctx, IoCtx& io, const std::string& oid, ObjectReadOperation *read_op, int flags, - CompletionToken&& token, const jspan_context* trace_ctx = nullptr) + const jspan_context* trace_ctx, CompletionToken&& token) { using Op = detail::AsyncOp; using Signature = typename Op::Signature; - boost::asio::async_completion init(token); - auto p = Op::create(ctx.get_executor(), init.completion_handler); - auto& op = p->user_data; - - int ret = io.aio_operate(oid, op.aio_completion.get(), read_op, - flags, &op.result); - if (ret < 0) { - auto ec = boost::system::error_code{-ret, librados::detail::err_category()}; - ceph::async::post(std::move(p), ec, bufferlist{}); - } else { - p.release(); // release ownership until completion - } - return init.result.get(); + return boost::asio::async_initiate( + [] (auto handler, auto ex, IoCtx& io, const std::string& oid, + ObjectReadOperation *read_op, int flags) { + auto p = Op::create(ex, std::move(handler)); + auto& op = p->user_data; + + int ret = io.aio_operate(oid, op.aio_completion.get(), read_op, + flags, &op.result); + if (ret < 0) { + auto ec = boost::system::error_code{-ret, librados::detail::err_category()}; + ceph::async::post(std::move(p), ec, bufferlist{}); + } else { + p.release(); // release ownership until completion + } + }, token, ctx.get_executor(), io, oid, read_op, flags); } /// Calls IoCtx::aio_operate() and arranges for the AioCompletion to call a @@ -176,22 +182,25 @@ auto async_operate(ExecutionContext& ctx, IoCtx& io, const std::string& oid, template auto async_operate(ExecutionContext& ctx, IoCtx& io, const std::string& oid, ObjectWriteOperation *write_op, int flags, - CompletionToken &&token, const jspan_context* trace_ctx = nullptr) + const jspan_context* trace_ctx, CompletionToken &&token) { using Op = detail::AsyncOp; using Signature = typename Op::Signature; - boost::asio::async_completion init(token); - auto p = Op::create(ctx.get_executor(), init.completion_handler); - auto& op = p->user_data; - - int ret = io.aio_operate(oid, op.aio_completion.get(), write_op, flags, trace_ctx); - if (ret < 0) { - auto ec = boost::system::error_code{-ret, librados::detail::err_category()}; - ceph::async::post(std::move(p), ec); - } else { - p.release(); // release ownership until completion - } - return init.result.get(); + return boost::asio::async_initiate( + [] (auto handler, auto ex, IoCtx& io, const std::string& oid, + ObjectWriteOperation *write_op, int flags, + const jspan_context* trace_ctx) { + auto p = Op::create(ex, std::move(handler)); + auto& op = p->user_data; + + int ret = io.aio_operate(oid, op.aio_completion.get(), write_op, flags, trace_ctx); + if (ret < 0) { + auto ec = boost::system::error_code{-ret, librados::detail::err_category()}; + ceph::async::post(std::move(p), ec); + } else { + p.release(); // release ownership until completion + } + }, token, ctx.get_executor(), io, oid, write_op, flags, trace_ctx); } /// Calls IoCtx::aio_notify() and arranges for the AioCompletion to call a @@ -202,19 +211,21 @@ auto async_notify(ExecutionContext& ctx, IoCtx& io, const std::string& oid, { using Op = detail::AsyncOp; using Signature = typename Op::Signature; - boost::asio::async_completion init(token); - auto p = Op::create(ctx.get_executor(), init.completion_handler); - auto& op = p->user_data; - - int ret = io.aio_notify(oid, op.aio_completion.get(), - bl, timeout_ms, &op.result); - if (ret < 0) { - auto ec = boost::system::error_code{-ret, librados::detail::err_category()}; - ceph::async::post(std::move(p), ec, bufferlist{}); - } else { - p.release(); // release ownership until completion - } - return init.result.get(); + return boost::asio::async_initiate( + [] (auto handler, auto ex, IoCtx& io, const std::string& oid, + bufferlist& bl, uint64_t timeout_ms) { + auto p = Op::create(ex, std::move(handler)); + auto& op = p->user_data; + + int ret = io.aio_notify(oid, op.aio_completion.get(), + bl, timeout_ms, &op.result); + if (ret < 0) { + auto ec = boost::system::error_code{-ret, librados::detail::err_category()}; + ceph::async::post(std::move(p), ec, bufferlist{}); + } else { + p.release(); // release ownership until completion + } + }, token, ctx.get_executor(), io, oid, bl, timeout_ms); } } // namespace librados diff --git a/src/rgw/driver/rados/rgw_tools.cc b/src/rgw/driver/rados/rgw_tools.cc index c143875538aad..41254f9519e34 100644 --- a/src/rgw/driver/rados/rgw_tools.cc +++ b/src/rgw/driver/rados/rgw_tools.cc @@ -207,7 +207,7 @@ int rgw_rados_operate(const DoutPrefixProvider *dpp, librados::IoCtx& ioctx, con auto& yield = y.get_yield_context(); boost::system::error_code ec; auto bl = librados::async_operate( - context, ioctx, oid, op, flags, yield[ec]); + context, ioctx, oid, op, flags, trace_info, yield[ec]); if (pbl) { *pbl = std::move(bl); } @@ -231,7 +231,7 @@ int rgw_rados_operate(const DoutPrefixProvider *dpp, librados::IoCtx& ioctx, con auto& context = y.get_io_context(); auto& yield = y.get_yield_context(); boost::system::error_code ec; - librados::async_operate(context, ioctx, oid, op, flags, yield[ec], trace_info); + librados::async_operate(context, ioctx, oid, op, flags, trace_info, yield[ec]); return -ec.value(); } if (is_asio_thread) { diff --git a/src/rgw/rgw_aio.cc b/src/rgw/rgw_aio.cc index 293dea13217cb..1c9a54f072647 100644 --- a/src/rgw/rgw_aio.cc +++ b/src/rgw/rgw_aio.cc @@ -98,8 +98,8 @@ Aio::OpFunc aio_abstract(librados::IoCtx ctx, Op&& op, async_completion init(yield); auto ex = get_associated_executor(init.completion_handler); - librados::async_operate(context, ctx, r.obj.oid, &op, 0, - bind_executor(ex, Handler{aio, ctx, r}), trace_ctx); + librados::async_operate(context, ctx, r.obj.oid, &op, 0, trace_ctx, + bind_executor(ex, Handler{aio, ctx, r})); }; } diff --git a/src/test/librados/asio.cc b/src/test/librados/asio.cc index 459ce6896a540..0ede2e14fb465 100644 --- a/src/test/librados/asio.cc +++ b/src/test/librados/asio.cc @@ -208,7 +208,7 @@ TEST_F(AsioRados, AsyncReadOperationCallback) EXPECT_FALSE(ec); EXPECT_EQ("hello", bl.to_str()); }; - librados::async_operate(service, io, "exist", &op, 0, success_cb); + librados::async_operate(service, io, "exist", &op, 0, nullptr, success_cb); } { librados::ObjectReadOperation op; @@ -216,7 +216,7 @@ TEST_F(AsioRados, AsyncReadOperationCallback) auto failure_cb = [&] (boost::system::error_code ec, bufferlist bl) { EXPECT_EQ(boost::system::errc::no_such_file_or_directory, ec); }; - librados::async_operate(service, io, "noexist", &op, 0, failure_cb); + librados::async_operate(service, io, "noexist", &op, 0, nullptr, failure_cb); } service.run(); } @@ -228,14 +228,14 @@ TEST_F(AsioRados, AsyncReadOperationFuture) { librados::ObjectReadOperation op; op.read(0, 0, nullptr, nullptr); - f1 = librados::async_operate(service, io, "exist", &op, 0, + f1 = librados::async_operate(service, io, "exist", &op, 0, nullptr, boost::asio::use_future); } std::future f2; { librados::ObjectReadOperation op; op.read(0, 0, nullptr, nullptr); - f2 = librados::async_operate(service, io, "noexist", &op, 0, + f2 = librados::async_operate(service, io, "noexist", &op, 0, nullptr, boost::asio::use_future); } service.run(); @@ -255,7 +255,7 @@ TEST_F(AsioRados, AsyncReadOperationYield) librados::ObjectReadOperation op; op.read(0, 0, nullptr, nullptr); boost::system::error_code ec; - auto bl = librados::async_operate(service, io, "exist", &op, 0, + auto bl = librados::async_operate(service, io, "exist", &op, 0, nullptr, yield[ec]); EXPECT_FALSE(ec); EXPECT_EQ("hello", bl.to_str()); @@ -266,7 +266,7 @@ TEST_F(AsioRados, AsyncReadOperationYield) librados::ObjectReadOperation op; op.read(0, 0, nullptr, nullptr); boost::system::error_code ec; - auto bl = librados::async_operate(service, io, "noexist", &op, 0, + auto bl = librados::async_operate(service, io, "noexist", &op, 0, nullptr, yield[ec]); EXPECT_EQ(boost::system::errc::no_such_file_or_directory, ec); }; @@ -288,7 +288,7 @@ TEST_F(AsioRados, AsyncWriteOperationCallback) auto success_cb = [&] (boost::system::error_code ec) { EXPECT_FALSE(ec); }; - librados::async_operate(service, io, "exist", &op, 0, success_cb); + librados::async_operate(service, io, "exist", &op, 0, nullptr, success_cb); } { librados::ObjectWriteOperation op; @@ -296,7 +296,7 @@ TEST_F(AsioRados, AsyncWriteOperationCallback) auto failure_cb = [&] (boost::system::error_code ec) { EXPECT_EQ(boost::system::errc::read_only_file_system, ec); }; - librados::async_operate(service, snapio, "exist", &op, 0, failure_cb); + librados::async_operate(service, snapio, "exist", &op, 0, nullptr, failure_cb); } service.run(); } @@ -312,14 +312,14 @@ TEST_F(AsioRados, AsyncWriteOperationFuture) { librados::ObjectWriteOperation op; op.write_full(bl); - f1 = librados::async_operate(service, io, "exist", &op, 0, + f1 = librados::async_operate(service, io, "exist", &op, 0, nullptr, boost::asio::use_future); } std::future f2; { librados::ObjectWriteOperation op; op.write_full(bl); - f2 = librados::async_operate(service, snapio, "exist", &op, 0, + f2 = librados::async_operate(service, snapio, "exist", &op, 0, nullptr, boost::asio::use_future); } service.run(); @@ -339,7 +339,7 @@ TEST_F(AsioRados, AsyncWriteOperationYield) librados::ObjectWriteOperation op; op.write_full(bl); boost::system::error_code ec; - librados::async_operate(service, io, "exist", &op, 0, yield[ec]); + librados::async_operate(service, io, "exist", &op, 0, nullptr, yield[ec]); EXPECT_FALSE(ec); }; spawn::spawn(service, success_cr); @@ -348,7 +348,7 @@ TEST_F(AsioRados, AsyncWriteOperationYield) librados::ObjectWriteOperation op; op.write_full(bl); boost::system::error_code ec; - librados::async_operate(service, snapio, "exist", &op, 0, yield[ec]); + librados::async_operate(service, snapio, "exist", &op, 0, nullptr, yield[ec]); EXPECT_EQ(boost::system::errc::read_only_file_system, ec); }; spawn::spawn(service, failure_cr);