From: Casey Bodley Date: Thu, 17 Oct 2024 02:18:02 +0000 (-0400) Subject: librados/asio: forward asio cancellations to AioCompletion::cancel() X-Git-Tag: v20.0.0~438^2~2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=3e40916227ae7f158d7f5c06ce955ae49823236e;p=ceph.git librados/asio: forward asio cancellations to AioCompletion::cancel() if the completion handler has a cancellation slot connected, construct a cancellation handler for it that calls AioCompletion::cancel() read operations can support more cancellation types than writes. see the table of cancellation types and their associated requirements/guarantees: https://www.boost.org/doc/libs/1_85_0/doc/html/boost_asio/overview/core/cancellation.html#boost_asio.overview.core.cancellation.t0 Signed-off-by: Casey Bodley --- diff --git a/src/librados/librados_asio.h b/src/librados/librados_asio.h index 0aedc376575..d730aea73a0 100644 --- a/src/librados/librados_asio.h +++ b/src/librados/librados_asio.h @@ -14,6 +14,9 @@ #ifndef LIBRADOS_ASIO_H #define LIBRADOS_ASIO_H +#include +#include + #include "include/rados/librados.hpp" #include "common/async/completion.h" #include "librados/AioCompletionImpl.h" @@ -74,6 +77,7 @@ struct Invoker { template struct AsyncOp : Invoker { unique_aio_completion_ptr aio_completion; + boost::asio::cancellation_slot slot; using Signature = typename Invoker::Signature; using Completion = ceph::async::Completion>; @@ -83,6 +87,7 @@ struct AsyncOp : Invoker { auto p = std::unique_ptr{static_cast(arg)}; // move result out of Completion memory being freed auto op = std::move(p->user_data); + op.slot.clear(); // clear our cancellation handler // access AioCompletionImpl directly to avoid locking const librados::AioCompletionImpl* pc = op.aio_completion->pc; const int ret = pc->rval; @@ -94,11 +99,46 @@ struct AsyncOp : Invoker { op.dispatch(std::move(p), ec, ver); } + struct op_cancellation { + AioCompletion* completion = nullptr; + bool is_read = false; + + void operator()(boost::asio::cancellation_type type) { + if (completion == nullptr) { + return; // no AioCompletion attached + } else if (type == boost::asio::cancellation_type::none) { + return; // no cancellation requested + } else if (is_read) { + // read operations produce no side effects, so can satisfy the + // requirements of 'total' cancellation. the weaker requirements + // of 'partial' and 'terminal' are also satisfied + completion->cancel(); + } else if (type == boost::asio::cancellation_type::terminal) { + // write operations only support 'terminal' cancellation because we + // can't guarantee that no osd has succeeded (or will succeed) in + // applying the write + completion->cancel(); + } + } + }; + template - static auto create(const Executor1& ex1, CompletionHandler&& handler) { + static auto create(const Executor1& ex1, bool is_read, + CompletionHandler&& handler) { + op_cancellation* cancel_handler = nullptr; + auto slot = boost::asio::get_associated_cancellation_slot(handler); + if (slot.is_connected()) { + cancel_handler = &slot.template emplace(); + } + auto p = Completion::create(ex1, std::move(handler)); p->user_data.aio_completion.reset( Rados::aio_create_completion(p.get(), aio_dispatch)); + if (cancel_handler) { + cancel_handler->completion = p->user_data.aio_completion.get(); + cancel_handler->is_read = is_read; + p->user_data.slot = std::move(slot); + } return p; } }; @@ -117,7 +157,8 @@ auto async_read(ExecutionContext& ctx, IoCtx& io, const std::string& oid, return boost::asio::async_initiate( [] (auto handler, auto ex, IoCtx& io, const std::string& oid, size_t len, uint64_t off) { - auto p = Op::create(ex, std::move(handler)); + constexpr bool is_read = true; + auto p = Op::create(ex, is_read, std::move(handler)); auto& op = p->user_data; int ret = io.aio_read(oid, op.aio_completion.get(), &op.result, len, off); @@ -142,7 +183,8 @@ auto async_write(ExecutionContext& ctx, IoCtx& io, const std::string& oid, return boost::asio::async_initiate( [] (auto handler, auto ex, IoCtx& io, const std::string& oid, const bufferlist &bl, size_t len, uint64_t off) { - auto p = Op::create(ex, std::move(handler)); + constexpr bool is_read = false; + auto p = Op::create(ex, is_read, std::move(handler)); auto& op = p->user_data; int ret = io.aio_write(oid, op.aio_completion.get(), bl, len, off); @@ -167,7 +209,8 @@ auto async_operate(ExecutionContext& ctx, IoCtx& io, const std::string& oid, return boost::asio::async_initiate( [] (auto handler, auto ex, IoCtx& io, const std::string& oid, ObjectReadOperation *read_op, int flags) { - auto p = Op::create(ex, std::move(handler)); + constexpr bool is_read = true; + auto p = Op::create(ex, is_read, std::move(handler)); auto& op = p->user_data; int ret = io.aio_operate(oid, op.aio_completion.get(), read_op, @@ -194,7 +237,8 @@ auto async_operate(ExecutionContext& ctx, IoCtx& io, const std::string& oid, [] (auto handler, auto ex, IoCtx& io, const std::string& oid, ObjectWriteOperation *write_op, int flags, const jspan_context* trace_ctx) { - auto p = Op::create(ex, std::move(handler)); + constexpr bool is_read = false; + auto p = Op::create(ex, is_read, std::move(handler)); auto& op = p->user_data; int ret = io.aio_operate(oid, op.aio_completion.get(), write_op, flags, trace_ctx); @@ -218,7 +262,8 @@ auto async_notify(ExecutionContext& ctx, IoCtx& io, const std::string& oid, return boost::asio::async_initiate( [] (auto handler, auto ex, IoCtx& io, const std::string& oid, bufferlist& bl, uint64_t timeout_ms) { - auto p = Op::create(ex, std::move(handler)); + constexpr bool is_read = false; + auto p = Op::create(ex, is_read, std::move(handler)); auto& op = p->user_data; int ret = io.aio_notify(oid, op.aio_completion.get(), diff --git a/src/test/librados/asio.cc b/src/test/librados/asio.cc index 01ebb957150..500f36508a7 100644 --- a/src/test/librados/asio.cc +++ b/src/test/librados/asio.cc @@ -21,10 +21,14 @@ #include #include +#include +#include #include #include #include +#include + #define dout_subsys ceph_subsys_rados #define dout_context g_ceph_context @@ -78,6 +82,15 @@ void rethrow(std::exception_ptr eptr) { if (eptr) std::rethrow_exception(eptr); } +auto capture(std::optional& out) { + return [&out] (error_code ec, ...) { out = ec; }; +} + +auto capture(boost::asio::cancellation_signal& signal, + std::optional& out) { + return boost::asio::bind_cancellation_slot(signal.slot(), capture(out)); +} + TEST_F(AsioRados, AsyncReadCallback) { boost::asio::io_context service; @@ -385,6 +398,130 @@ TEST_F(AsioRados, AsyncWriteOperationYield) service.run(); } +// FIXME: this crashes on windows with: +// Thread 1 received signal SIGILL, Illegal instruction. +#ifndef _WIN32 + +TEST_F(AsioRados, AsyncReadOperationCancelTerminal) +{ + // cancellation tests are racy, so retry if completion beats the cancellation + boost::system::error_code ec; + int tries = 10; + do { + boost::asio::io_context service; + boost::asio::cancellation_signal signal; + std::optional result; + + librados::ObjectReadOperation op; + op.assert_exists(); + librados::async_operate(service, io, "noexist", &op, 0, nullptr, + capture(signal, result)); + + service.poll(); + EXPECT_FALSE(service.stopped()); + EXPECT_FALSE(result); + + signal.emit(boost::asio::cancellation_type::terminal); + + service.run(); + ASSERT_TRUE(result); + ec = *result; + + signal.emit(boost::asio::cancellation_type::all); // noop + } while (ec == std::errc::no_such_file_or_directory && --tries); + + EXPECT_EQ(ec, boost::asio::error::operation_aborted); +} + +TEST_F(AsioRados, AsyncReadOperationCancelTotal) +{ + // cancellation tests are racy, so retry if completion beats the cancellation + boost::system::error_code ec; + int tries = 10; + do { + boost::asio::io_context service; + boost::asio::cancellation_signal signal; + std::optional result; + + librados::ObjectReadOperation op; + op.assert_exists(); + librados::async_operate(service, io, "noexist", &op, 0, nullptr, + capture(signal, result)); + + service.poll(); + EXPECT_FALSE(service.stopped()); + EXPECT_FALSE(result); + + signal.emit(boost::asio::cancellation_type::total); + + service.run(); + ASSERT_TRUE(result); + ec = *result; + + signal.emit(boost::asio::cancellation_type::all); // noop + } while (ec == std::errc::no_such_file_or_directory && --tries); + + EXPECT_EQ(ec, boost::asio::error::operation_aborted); +} + +TEST_F(AsioRados, AsyncWriteOperationCancelTerminal) +{ + // cancellation tests are racy, so retry if completion beats the cancellation + boost::system::error_code ec; + int tries = 10; + do { + boost::asio::io_context service; + boost::asio::cancellation_signal signal; + std::optional result; + + librados::ObjectWriteOperation op; + op.assert_exists(); + librados::async_operate(service, io, "noexist", &op, 0, nullptr, + capture(signal, result)); + + service.poll(); + EXPECT_FALSE(service.stopped()); + EXPECT_FALSE(result); + + signal.emit(boost::asio::cancellation_type::terminal); + + service.run(); + ASSERT_TRUE(result); + ec = *result; + + signal.emit(boost::asio::cancellation_type::all); // noop + } while (ec == std::errc::no_such_file_or_directory && --tries); + + EXPECT_EQ(ec, boost::asio::error::operation_aborted); +} + +TEST_F(AsioRados, AsyncWriteOperationCancelTotal) +{ + boost::asio::io_context service; + boost::asio::cancellation_signal signal; + std::optional ec; + + librados::ObjectWriteOperation op; + op.assert_exists(); + librados::async_operate(service, io, "noexist", &op, 0, nullptr, + capture(signal, ec)); + + service.poll(); + EXPECT_FALSE(service.stopped()); + EXPECT_FALSE(ec); + + // noop, write only supports terminal + signal.emit(boost::asio::cancellation_type::total); + + service.run(); + ASSERT_TRUE(ec); + EXPECT_EQ(ec, std::errc::no_such_file_or_directory); + + signal.emit(boost::asio::cancellation_type::all); // noop +} + +#endif // not _WIN32 + int main(int argc, char **argv) { auto args = argv_to_vec(argc, argv);