]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
librados/asio: forward asio cancellations to AioCompletion::cancel()
authorCasey Bodley <cbodley@redhat.com>
Thu, 17 Oct 2024 02:18:02 +0000 (22:18 -0400)
committerCasey Bodley <cbodley@redhat.com>
Tue, 19 Nov 2024 13:35:29 +0000 (08:35 -0500)
if the completion handler has a cancellation slot connected, construct a
cancellation handler for it that calls AioCompletion::cancel()

read operations can support more cancellation types than writes. see the
table of cancellation types and their associated requirements/guarantees:

https://www.boost.org/doc/libs/1_85_0/doc/html/boost_asio/overview/core/cancellation.html#boost_asio.overview.core.cancellation.t0

Signed-off-by: Casey Bodley <cbodley@redhat.com>
src/librados/librados_asio.h
src/test/librados/asio.cc

index 0aedc3765752976644bb82ca847f8369b8a584ae..d730aea73a006fe6ac64088d5c9ba95749da6951 100644 (file)
@@ -14,6 +14,9 @@
 #ifndef LIBRADOS_ASIO_H
 #define LIBRADOS_ASIO_H
 
+#include <boost/asio/associated_cancellation_slot.hpp>
+#include <boost/asio/cancellation_type.hpp>
+
 #include "include/rados/librados.hpp"
 #include "common/async/completion.h"
 #include "librados/AioCompletionImpl.h"
@@ -74,6 +77,7 @@ struct Invoker<void> {
 template <typename Result>
 struct AsyncOp : Invoker<Result> {
   unique_aio_completion_ptr aio_completion;
+  boost::asio::cancellation_slot slot;
 
   using Signature = typename Invoker<Result>::Signature;
   using Completion = ceph::async::Completion<Signature, AsyncOp<Result>>;
@@ -83,6 +87,7 @@ struct AsyncOp : Invoker<Result> {
     auto p = std::unique_ptr<Completion>{static_cast<Completion*>(arg)};
     // move result out of Completion memory being freed
     auto op = std::move(p->user_data);
+    op.slot.clear(); // clear our cancellation handler
     // access AioCompletionImpl directly to avoid locking
     const librados::AioCompletionImpl* pc = op.aio_completion->pc;
     const int ret = pc->rval;
@@ -94,11 +99,46 @@ struct AsyncOp : Invoker<Result> {
     op.dispatch(std::move(p), ec, ver);
   }
 
+  struct op_cancellation {
+    AioCompletion* completion = nullptr;
+    bool is_read = false;
+
+    void operator()(boost::asio::cancellation_type type) {
+      if (completion == nullptr) {
+        return; // no AioCompletion attached
+      } else if (type == boost::asio::cancellation_type::none) {
+        return; // no cancellation requested
+      } else if (is_read) {
+        // read operations produce no side effects, so can satisfy the
+        // requirements of 'total' cancellation. the weaker requirements
+        // of 'partial' and 'terminal' are also satisfied
+        completion->cancel();
+      } else if (type == boost::asio::cancellation_type::terminal) {
+        // write operations only support 'terminal' cancellation because we
+        // can't guarantee that no osd has succeeded (or will succeed) in
+        // applying the write
+        completion->cancel();
+      }
+    }
+  };
+
   template <typename Executor1, typename CompletionHandler>
-  static auto create(const Executor1& ex1, CompletionHandler&& handler) {
+  static auto create(const Executor1& ex1, bool is_read,
+                     CompletionHandler&& handler) {
+    op_cancellation* cancel_handler = nullptr;
+    auto slot = boost::asio::get_associated_cancellation_slot(handler);
+    if (slot.is_connected()) {
+      cancel_handler = &slot.template emplace<op_cancellation>();
+    }
+
     auto p = Completion::create(ex1, std::move(handler));
     p->user_data.aio_completion.reset(
         Rados::aio_create_completion(p.get(), aio_dispatch));
+    if (cancel_handler) {
+      cancel_handler->completion = p->user_data.aio_completion.get();
+      cancel_handler->is_read = is_read;
+      p->user_data.slot = std::move(slot);
+    }
     return p;
   }
 };
@@ -117,7 +157,8 @@ auto async_read(ExecutionContext& ctx, IoCtx& io, const std::string& oid,
   return boost::asio::async_initiate<CompletionToken, Signature>(
       [] (auto handler, auto ex, IoCtx& io, const std::string& oid,
           size_t len, uint64_t off) {
-        auto p = Op::create(ex, std::move(handler));
+        constexpr bool is_read = true;
+        auto p = Op::create(ex, is_read, std::move(handler));
         auto& op = p->user_data;
 
         int ret = io.aio_read(oid, op.aio_completion.get(), &op.result, len, off);
@@ -142,7 +183,8 @@ auto async_write(ExecutionContext& ctx, IoCtx& io, const std::string& oid,
   return boost::asio::async_initiate<CompletionToken, Signature>(
       [] (auto handler, auto ex, IoCtx& io, const std::string& oid,
           const bufferlist &bl, size_t len, uint64_t off) {
-        auto p = Op::create(ex, std::move(handler));
+        constexpr bool is_read = false;
+        auto p = Op::create(ex, is_read, std::move(handler));
         auto& op = p->user_data;
 
         int ret = io.aio_write(oid, op.aio_completion.get(), bl, len, off);
@@ -167,7 +209,8 @@ auto async_operate(ExecutionContext& ctx, IoCtx& io, const std::string& oid,
   return boost::asio::async_initiate<CompletionToken, Signature>(
       [] (auto handler, auto ex, IoCtx& io, const std::string& oid,
           ObjectReadOperation *read_op, int flags) {
-        auto p = Op::create(ex, std::move(handler));
+        constexpr bool is_read = true;
+        auto p = Op::create(ex, is_read, std::move(handler));
         auto& op = p->user_data;
 
         int ret = io.aio_operate(oid, op.aio_completion.get(), read_op,
@@ -194,7 +237,8 @@ auto async_operate(ExecutionContext& ctx, IoCtx& io, const std::string& oid,
       [] (auto handler, auto ex, IoCtx& io, const std::string& oid,
           ObjectWriteOperation *write_op, int flags,
           const jspan_context* trace_ctx) {
-        auto p = Op::create(ex, std::move(handler));
+        constexpr bool is_read = false;
+        auto p = Op::create(ex, is_read, std::move(handler));
         auto& op = p->user_data;
 
         int ret = io.aio_operate(oid, op.aio_completion.get(), write_op, flags, trace_ctx);
@@ -218,7 +262,8 @@ auto async_notify(ExecutionContext& ctx, IoCtx& io, const std::string& oid,
   return boost::asio::async_initiate<CompletionToken, Signature>(
       [] (auto handler, auto ex, IoCtx& io, const std::string& oid,
           bufferlist& bl, uint64_t timeout_ms) {
-        auto p = Op::create(ex, std::move(handler));
+        constexpr bool is_read = false;
+        auto p = Op::create(ex, is_read, std::move(handler));
         auto& op = p->user_data;
 
         int ret = io.aio_notify(oid, op.aio_completion.get(),
index 01ebb95715034667741f5944d26debdc5b32b47f..500f36508a71b4ab4fb981c2144a10b43c6ddaf3 100644 (file)
 
 #include <boost/range/begin.hpp>
 #include <boost/range/end.hpp>
+#include <boost/asio/bind_cancellation_slot.hpp>
+#include <boost/asio/cancellation_signal.hpp>
 #include <boost/asio/io_context.hpp>
 #include <boost/asio/spawn.hpp>
 #include <boost/asio/use_future.hpp>
 
+#include <optional>
+
 #define dout_subsys ceph_subsys_rados
 #define dout_context g_ceph_context
 
@@ -78,6 +82,15 @@ void rethrow(std::exception_ptr eptr) {
   if (eptr) std::rethrow_exception(eptr);
 }
 
+auto capture(std::optional<error_code>& out) {
+  return [&out] (error_code ec, ...) { out = ec; };
+}
+
+auto capture(boost::asio::cancellation_signal& signal,
+             std::optional<error_code>& out) {
+  return boost::asio::bind_cancellation_slot(signal.slot(), capture(out));
+}
+
 TEST_F(AsioRados, AsyncReadCallback)
 {
   boost::asio::io_context service;
@@ -385,6 +398,130 @@ TEST_F(AsioRados, AsyncWriteOperationYield)
   service.run();
 }
 
+// FIXME: this crashes on windows with:
+// Thread 1 received signal SIGILL, Illegal instruction.
+#ifndef _WIN32
+
+TEST_F(AsioRados, AsyncReadOperationCancelTerminal)
+{
+  // cancellation tests are racy, so retry if completion beats the cancellation
+  boost::system::error_code ec;
+  int tries = 10;
+  do {
+    boost::asio::io_context service;
+    boost::asio::cancellation_signal signal;
+    std::optional<error_code> result;
+
+    librados::ObjectReadOperation op;
+    op.assert_exists();
+    librados::async_operate(service, io, "noexist", &op, 0, nullptr,
+                            capture(signal, result));
+
+    service.poll();
+    EXPECT_FALSE(service.stopped());
+    EXPECT_FALSE(result);
+
+    signal.emit(boost::asio::cancellation_type::terminal);
+
+    service.run();
+    ASSERT_TRUE(result);
+    ec = *result;
+
+    signal.emit(boost::asio::cancellation_type::all); // noop
+  } while (ec == std::errc::no_such_file_or_directory && --tries);
+
+  EXPECT_EQ(ec, boost::asio::error::operation_aborted);
+}
+
+TEST_F(AsioRados, AsyncReadOperationCancelTotal)
+{
+  // cancellation tests are racy, so retry if completion beats the cancellation
+  boost::system::error_code ec;
+  int tries = 10;
+  do {
+    boost::asio::io_context service;
+    boost::asio::cancellation_signal signal;
+    std::optional<error_code> result;
+
+    librados::ObjectReadOperation op;
+    op.assert_exists();
+    librados::async_operate(service, io, "noexist", &op, 0, nullptr,
+                            capture(signal, result));
+
+    service.poll();
+    EXPECT_FALSE(service.stopped());
+    EXPECT_FALSE(result);
+
+    signal.emit(boost::asio::cancellation_type::total);
+
+    service.run();
+    ASSERT_TRUE(result);
+    ec = *result;
+
+    signal.emit(boost::asio::cancellation_type::all); // noop
+  } while (ec == std::errc::no_such_file_or_directory && --tries);
+
+  EXPECT_EQ(ec, boost::asio::error::operation_aborted);
+}
+
+TEST_F(AsioRados, AsyncWriteOperationCancelTerminal)
+{
+  // cancellation tests are racy, so retry if completion beats the cancellation
+  boost::system::error_code ec;
+  int tries = 10;
+  do {
+    boost::asio::io_context service;
+    boost::asio::cancellation_signal signal;
+    std::optional<error_code> result;
+
+    librados::ObjectWriteOperation op;
+    op.assert_exists();
+    librados::async_operate(service, io, "noexist", &op, 0, nullptr,
+                            capture(signal, result));
+
+    service.poll();
+    EXPECT_FALSE(service.stopped());
+    EXPECT_FALSE(result);
+
+    signal.emit(boost::asio::cancellation_type::terminal);
+
+    service.run();
+    ASSERT_TRUE(result);
+    ec = *result;
+
+    signal.emit(boost::asio::cancellation_type::all); // noop
+  } while (ec == std::errc::no_such_file_or_directory && --tries);
+
+  EXPECT_EQ(ec, boost::asio::error::operation_aborted);
+}
+
+TEST_F(AsioRados, AsyncWriteOperationCancelTotal)
+{
+  boost::asio::io_context service;
+  boost::asio::cancellation_signal signal;
+  std::optional<error_code> ec;
+
+  librados::ObjectWriteOperation op;
+  op.assert_exists();
+  librados::async_operate(service, io, "noexist", &op, 0, nullptr,
+                          capture(signal, ec));
+
+  service.poll();
+  EXPECT_FALSE(service.stopped());
+  EXPECT_FALSE(ec);
+
+  // noop, write only supports terminal
+  signal.emit(boost::asio::cancellation_type::total);
+
+  service.run();
+  ASSERT_TRUE(ec);
+  EXPECT_EQ(ec, std::errc::no_such_file_or_directory);
+
+  signal.emit(boost::asio::cancellation_type::all); // noop
+}
+
+#endif // not _WIN32
+
 int main(int argc, char **argv)
 {
   auto args = argv_to_vec(argc, argv);