]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
librados/asio: functions use async_initiate
authorCasey Bodley <cbodley@redhat.com>
Thu, 15 Feb 2024 03:36:46 +0000 (22:36 -0500)
committerCasey Bodley <cbodley@redhat.com>
Mon, 13 May 2024 16:13:39 +0000 (12:13 -0400)
Signed-off-by: Casey Bodley <cbodley@redhat.com>
src/librados/librados_asio.h
src/rgw/driver/rados/rgw_tools.cc
src/rgw/rgw_aio.cc
src/test/librados/asio.cc

index 2eae1c268f6cc2ba4933e25ac1dfa48d5b4f6993..19a8c8fc01de4b70e211bc19284f7e6e6a095ab9 100644 (file)
@@ -110,18 +110,20 @@ auto async_read(ExecutionContext& ctx, IoCtx& io, const std::string& oid,
 {
   using Op = detail::AsyncOp<bufferlist>;
   using Signature = typename Op::Signature;
-  boost::asio::async_completion<CompletionToken, Signature> init(token);
-  auto p = Op::create(ctx.get_executor(), init.completion_handler);
-  auto& op = p->user_data;
-
-  int ret = io.aio_read(oid, op.aio_completion.get(), &op.result, len, off);
-  if (ret < 0) {
-    auto ec = boost::system::error_code{-ret, librados::detail::err_category()};
-    ceph::async::post(std::move(p), ec, bufferlist{});
-  } else {
-    p.release(); // release ownership until completion
-  }
-  return init.result.get();
+  return boost::asio::async_initiate<CompletionToken, Signature>(
+      [] (auto handler, auto ex, IoCtx& io, const std::string& oid,
+          size_t len, uint64_t off) {
+        auto p = Op::create(ex, std::move(handler));
+        auto& op = p->user_data;
+
+        int ret = io.aio_read(oid, op.aio_completion.get(), &op.result, len, off);
+        if (ret < 0) {
+          auto ec = boost::system::error_code{-ret, librados::detail::err_category()};
+          ceph::async::post(std::move(p), ec, bufferlist{});
+        } else {
+          p.release(); // release ownership until completion
+        }
+      }, token, ctx.get_executor(), io, oid, len, off);
 }
 
 /// Calls IoCtx::aio_write() and arranges for the AioCompletion to call a
@@ -133,18 +135,20 @@ auto async_write(ExecutionContext& ctx, IoCtx& io, const std::string& oid,
 {
   using Op = detail::AsyncOp<void>;
   using Signature = typename Op::Signature;
-  boost::asio::async_completion<CompletionToken, Signature> init(token);
-  auto p = Op::create(ctx.get_executor(), init.completion_handler);
-  auto& op = p->user_data;
-
-  int ret = io.aio_write(oid, op.aio_completion.get(), bl, len, off);
-  if (ret < 0) {
-    auto ec = boost::system::error_code{-ret, librados::detail::err_category()};
-    ceph::async::post(std::move(p), ec);
-  } else {
-    p.release(); // release ownership until completion
-  }
-  return init.result.get();
+  return boost::asio::async_initiate<CompletionToken, Signature>(
+      [] (auto handler, auto ex, IoCtx& io, const std::string& oid,
+          bufferlist &bl, size_t len, uint64_t off) {
+        auto p = Op::create(ex, std::move(handler));
+        auto& op = p->user_data;
+
+        int ret = io.aio_write(oid, op.aio_completion.get(), bl, len, off);
+        if (ret < 0) {
+          auto ec = boost::system::error_code{-ret, librados::detail::err_category()};
+          ceph::async::post(std::move(p), ec);
+        } else {
+          p.release(); // release ownership until completion
+        }
+      }, token, ctx.get_executor(), io, oid, bl, len, off);
 }
 
 /// Calls IoCtx::aio_operate() and arranges for the AioCompletion to call a
@@ -152,23 +156,25 @@ auto async_write(ExecutionContext& ctx, IoCtx& io, const std::string& oid,
 template <typename ExecutionContext, typename CompletionToken>
 auto async_operate(ExecutionContext& ctx, IoCtx& io, const std::string& oid,
                    ObjectReadOperation *read_op, int flags,
-                   CompletionToken&& token, const jspan_context* trace_ctx = nullptr)
+                   const jspan_context* trace_ctx, CompletionToken&& token)
 {
   using Op = detail::AsyncOp<bufferlist>;
   using Signature = typename Op::Signature;
-  boost::asio::async_completion<CompletionToken, Signature> init(token);
-  auto p = Op::create(ctx.get_executor(), init.completion_handler);
-  auto& op = p->user_data;
-
-  int ret = io.aio_operate(oid, op.aio_completion.get(), read_op,
-                           flags, &op.result);
-  if (ret < 0) {
-    auto ec = boost::system::error_code{-ret, librados::detail::err_category()};
-    ceph::async::post(std::move(p), ec, bufferlist{});
-  } else {
-    p.release(); // release ownership until completion
-  }
-  return init.result.get();
+  return boost::asio::async_initiate<CompletionToken, Signature>(
+      [] (auto handler, auto ex, IoCtx& io, const std::string& oid,
+          ObjectReadOperation *read_op, int flags) {
+        auto p = Op::create(ex, std::move(handler));
+        auto& op = p->user_data;
+
+        int ret = io.aio_operate(oid, op.aio_completion.get(), read_op,
+                                 flags, &op.result);
+        if (ret < 0) {
+          auto ec = boost::system::error_code{-ret, librados::detail::err_category()};
+          ceph::async::post(std::move(p), ec, bufferlist{});
+        } else {
+          p.release(); // release ownership until completion
+        }
+      }, token, ctx.get_executor(), io, oid, read_op, flags);
 }
 
 /// Calls IoCtx::aio_operate() and arranges for the AioCompletion to call a
@@ -176,22 +182,25 @@ auto async_operate(ExecutionContext& ctx, IoCtx& io, const std::string& oid,
 template <typename ExecutionContext, typename CompletionToken>
 auto async_operate(ExecutionContext& ctx, IoCtx& io, const std::string& oid,
                    ObjectWriteOperation *write_op, int flags,
-                   CompletionToken &&token, const jspan_context* trace_ctx = nullptr)
+                   const jspan_context* trace_ctx, CompletionToken &&token)
 {
   using Op = detail::AsyncOp<void>;
   using Signature = typename Op::Signature;
-  boost::asio::async_completion<CompletionToken, Signature> init(token);
-  auto p = Op::create(ctx.get_executor(), init.completion_handler);
-  auto& op = p->user_data;
-
-  int ret = io.aio_operate(oid, op.aio_completion.get(), write_op, flags, trace_ctx);
-  if (ret < 0) {
-    auto ec = boost::system::error_code{-ret, librados::detail::err_category()};
-    ceph::async::post(std::move(p), ec);
-  } else {
-    p.release(); // release ownership until completion
-  }
-  return init.result.get();
+  return boost::asio::async_initiate<CompletionToken, Signature>(
+      [] (auto handler, auto ex, IoCtx& io, const std::string& oid,
+          ObjectWriteOperation *write_op, int flags,
+          const jspan_context* trace_ctx) {
+        auto p = Op::create(ex, std::move(handler));
+        auto& op = p->user_data;
+
+        int ret = io.aio_operate(oid, op.aio_completion.get(), write_op, flags, trace_ctx);
+        if (ret < 0) {
+          auto ec = boost::system::error_code{-ret, librados::detail::err_category()};
+          ceph::async::post(std::move(p), ec);
+        } else {
+          p.release(); // release ownership until completion
+        }
+      }, token, ctx.get_executor(), io, oid, write_op, flags, trace_ctx);
 }
 
 /// Calls IoCtx::aio_notify() and arranges for the AioCompletion to call a
@@ -202,19 +211,21 @@ auto async_notify(ExecutionContext& ctx, IoCtx& io, const std::string& oid,
 {
   using Op = detail::AsyncOp<bufferlist>;
   using Signature = typename Op::Signature;
-  boost::asio::async_completion<CompletionToken, Signature> init(token);
-  auto p = Op::create(ctx.get_executor(), init.completion_handler);
-  auto& op = p->user_data;
-
-  int ret = io.aio_notify(oid, op.aio_completion.get(),
-                          bl, timeout_ms, &op.result);
-  if (ret < 0) {
-    auto ec = boost::system::error_code{-ret, librados::detail::err_category()};
-    ceph::async::post(std::move(p), ec, bufferlist{});
-  } else {
-    p.release(); // release ownership until completion
-  }
-  return init.result.get();
+  return boost::asio::async_initiate<CompletionToken, Signature>(
+      [] (auto handler, auto ex, IoCtx& io, const std::string& oid,
+          bufferlist& bl, uint64_t timeout_ms) {
+        auto p = Op::create(ex, std::move(handler));
+        auto& op = p->user_data;
+
+        int ret = io.aio_notify(oid, op.aio_completion.get(),
+                                bl, timeout_ms, &op.result);
+        if (ret < 0) {
+          auto ec = boost::system::error_code{-ret, librados::detail::err_category()};
+          ceph::async::post(std::move(p), ec, bufferlist{});
+        } else {
+          p.release(); // release ownership until completion
+        }
+      }, token, ctx.get_executor(), io, oid, bl, timeout_ms);
 }
 
 } // namespace librados
index c143875538aadc2c3b1f0268e399293c8993859e..41254f9519e344a70577d552f69837c717bd5a20 100644 (file)
@@ -207,7 +207,7 @@ int rgw_rados_operate(const DoutPrefixProvider *dpp, librados::IoCtx& ioctx, con
     auto& yield = y.get_yield_context();
     boost::system::error_code ec;
     auto bl = librados::async_operate(
-      context, ioctx, oid, op, flags, yield[ec]);
+      context, ioctx, oid, op, flags, trace_info, yield[ec]);
     if (pbl) {
       *pbl = std::move(bl);
     }
@@ -231,7 +231,7 @@ int rgw_rados_operate(const DoutPrefixProvider *dpp, librados::IoCtx& ioctx, con
     auto& context = y.get_io_context();
     auto& yield = y.get_yield_context();
     boost::system::error_code ec;
-    librados::async_operate(context, ioctx, oid, op, flags, yield[ec], trace_info);
+    librados::async_operate(context, ioctx, oid, op, flags, trace_info, yield[ec]);
     return -ec.value();
   }
   if (is_asio_thread) {
index cfbedfccecc4c52d6aa9dab5e7aa51649d96f0da..5b2ce1569b33ae18f3d45abd07f50ebc89466c6e 100644 (file)
@@ -99,8 +99,8 @@ Aio::OpFunc aio_abstract(librados::IoCtx ctx, Op&& op,
       async_completion<spawn::yield_context, void()> init(yield);
       auto ex = get_associated_executor(init.completion_handler);
 
-      librados::async_operate(context, ctx, r.obj.oid, &op, 0,
-                              bind_executor(ex, Handler{aio, ctx, r}), trace_ctx);
+      librados::async_operate(context, ctx, r.obj.oid, &op, 0, trace_ctx,
+                              bind_executor(ex, Handler{aio, ctx, r}));
     };
 }
 
index 459ce6896a54001d907216ab66373afc6c9e1894..0ede2e14fb46530d9b6252e4d27911be94316f51 100644 (file)
@@ -208,7 +208,7 @@ TEST_F(AsioRados, AsyncReadOperationCallback)
       EXPECT_FALSE(ec);
       EXPECT_EQ("hello", bl.to_str());
     };
-    librados::async_operate(service, io, "exist", &op, 0, success_cb);
+    librados::async_operate(service, io, "exist", &op, 0, nullptr, success_cb);
   }
   {
     librados::ObjectReadOperation op;
@@ -216,7 +216,7 @@ TEST_F(AsioRados, AsyncReadOperationCallback)
     auto failure_cb = [&] (boost::system::error_code ec, bufferlist bl) {
       EXPECT_EQ(boost::system::errc::no_such_file_or_directory, ec);
     };
-    librados::async_operate(service, io, "noexist", &op, 0, failure_cb);
+    librados::async_operate(service, io, "noexist", &op, 0, nullptr, failure_cb);
   }
   service.run();
 }
@@ -228,14 +228,14 @@ TEST_F(AsioRados, AsyncReadOperationFuture)
   {
     librados::ObjectReadOperation op;
     op.read(0, 0, nullptr, nullptr);
-    f1 = librados::async_operate(service, io, "exist", &op, 0,
+    f1 = librados::async_operate(service, io, "exist", &op, 0, nullptr,
                                  boost::asio::use_future);
   }
   std::future<bufferlist> f2;
   {
     librados::ObjectReadOperation op;
     op.read(0, 0, nullptr, nullptr);
-    f2 = librados::async_operate(service, io, "noexist", &op, 0,
+    f2 = librados::async_operate(service, io, "noexist", &op, 0, nullptr,
                                  boost::asio::use_future);
   }
   service.run();
@@ -255,7 +255,7 @@ TEST_F(AsioRados, AsyncReadOperationYield)
     librados::ObjectReadOperation op;
     op.read(0, 0, nullptr, nullptr);
     boost::system::error_code ec;
-    auto bl = librados::async_operate(service, io, "exist", &op, 0,
+    auto bl = librados::async_operate(service, io, "exist", &op, 0, nullptr,
                                       yield[ec]);
     EXPECT_FALSE(ec);
     EXPECT_EQ("hello", bl.to_str());
@@ -266,7 +266,7 @@ TEST_F(AsioRados, AsyncReadOperationYield)
     librados::ObjectReadOperation op;
     op.read(0, 0, nullptr, nullptr);
     boost::system::error_code ec;
-    auto bl = librados::async_operate(service, io, "noexist", &op, 0,
+    auto bl = librados::async_operate(service, io, "noexist", &op, 0, nullptr,
                                       yield[ec]);
     EXPECT_EQ(boost::system::errc::no_such_file_or_directory, ec);
   };
@@ -288,7 +288,7 @@ TEST_F(AsioRados, AsyncWriteOperationCallback)
     auto success_cb = [&] (boost::system::error_code ec) {
       EXPECT_FALSE(ec);
     };
-    librados::async_operate(service, io, "exist", &op, 0, success_cb);
+    librados::async_operate(service, io, "exist", &op, 0, nullptr, success_cb);
   }
   {
     librados::ObjectWriteOperation op;
@@ -296,7 +296,7 @@ TEST_F(AsioRados, AsyncWriteOperationCallback)
     auto failure_cb = [&] (boost::system::error_code ec) {
       EXPECT_EQ(boost::system::errc::read_only_file_system, ec);
     };
-    librados::async_operate(service, snapio, "exist", &op, 0, failure_cb);
+    librados::async_operate(service, snapio, "exist", &op, 0, nullptr, failure_cb);
   }
   service.run();
 }
@@ -312,14 +312,14 @@ TEST_F(AsioRados, AsyncWriteOperationFuture)
   {
     librados::ObjectWriteOperation op;
     op.write_full(bl);
-    f1 = librados::async_operate(service, io, "exist", &op, 0,
+    f1 = librados::async_operate(service, io, "exist", &op, 0, nullptr,
                                  boost::asio::use_future);
   }
   std::future<void> f2;
   {
     librados::ObjectWriteOperation op;
     op.write_full(bl);
-    f2 = librados::async_operate(service, snapio, "exist", &op, 0,
+    f2 = librados::async_operate(service, snapio, "exist", &op, 0, nullptr,
                                  boost::asio::use_future);
   }
   service.run();
@@ -339,7 +339,7 @@ TEST_F(AsioRados, AsyncWriteOperationYield)
     librados::ObjectWriteOperation op;
     op.write_full(bl);
     boost::system::error_code ec;
-    librados::async_operate(service, io, "exist", &op, 0, yield[ec]);
+    librados::async_operate(service, io, "exist", &op, 0, nullptr, yield[ec]);
     EXPECT_FALSE(ec);
   };
   spawn::spawn(service, success_cr);
@@ -348,7 +348,7 @@ TEST_F(AsioRados, AsyncWriteOperationYield)
     librados::ObjectWriteOperation op;
     op.write_full(bl);
     boost::system::error_code ec;
-    librados::async_operate(service, snapio, "exist", &op, 0, yield[ec]);
+    librados::async_operate(service, snapio, "exist", &op, 0, nullptr, yield[ec]);
     EXPECT_EQ(boost::system::errc::read_only_file_system, ec);
   };
   spawn::spawn(service, failure_cr);