#ifndef LIBRADOS_ASIO_H
#define LIBRADOS_ASIO_H
+#include <boost/asio/associated_cancellation_slot.hpp>
+#include <boost/asio/cancellation_type.hpp>
+
#include "include/rados/librados.hpp"
#include "common/async/completion.h"
#include "librados/AioCompletionImpl.h"
template <typename Result>
struct AsyncOp : Invoker<Result> {
unique_aio_completion_ptr aio_completion;
+ boost::asio::cancellation_slot slot;
using Signature = typename Invoker<Result>::Signature;
using Completion = ceph::async::Completion<Signature, AsyncOp<Result>>;
auto p = std::unique_ptr<Completion>{static_cast<Completion*>(arg)};
// move result out of Completion memory being freed
auto op = std::move(p->user_data);
+ op.slot.clear(); // clear our cancellation handler
// access AioCompletionImpl directly to avoid locking
const librados::AioCompletionImpl* pc = op.aio_completion->pc;
const int ret = pc->rval;
op.dispatch(std::move(p), ec, ver);
}
+ struct op_cancellation {
+ AioCompletion* completion = nullptr;
+ bool is_read = false;
+
+ void operator()(boost::asio::cancellation_type type) {
+ if (completion == nullptr) {
+ return; // no AioCompletion attached
+ } else if (type == boost::asio::cancellation_type::none) {
+ return; // no cancellation requested
+ } else if (is_read) {
+ // read operations produce no side effects, so can satisfy the
+ // requirements of 'total' cancellation. the weaker requirements
+ // of 'partial' and 'terminal' are also satisfied
+ completion->cancel();
+ } else if (type == boost::asio::cancellation_type::terminal) {
+ // write operations only support 'terminal' cancellation because we
+ // can't guarantee that no osd has succeeded (or will succeed) in
+ // applying the write
+ completion->cancel();
+ }
+ }
+ };
+
template <typename Executor1, typename CompletionHandler>
- static auto create(const Executor1& ex1, CompletionHandler&& handler) {
+ static auto create(const Executor1& ex1, bool is_read,
+ CompletionHandler&& handler) {
+ op_cancellation* cancel_handler = nullptr;
+ auto slot = boost::asio::get_associated_cancellation_slot(handler);
+ if (slot.is_connected()) {
+ cancel_handler = &slot.template emplace<op_cancellation>();
+ }
+
auto p = Completion::create(ex1, std::move(handler));
p->user_data.aio_completion.reset(
Rados::aio_create_completion(p.get(), aio_dispatch));
+ if (cancel_handler) {
+ cancel_handler->completion = p->user_data.aio_completion.get();
+ cancel_handler->is_read = is_read;
+ p->user_data.slot = std::move(slot);
+ }
return p;
}
};
return boost::asio::async_initiate<CompletionToken, Signature>(
[] (auto handler, auto ex, IoCtx& io, const std::string& oid,
size_t len, uint64_t off) {
- auto p = Op::create(ex, std::move(handler));
+ constexpr bool is_read = true;
+ auto p = Op::create(ex, is_read, std::move(handler));
auto& op = p->user_data;
int ret = io.aio_read(oid, op.aio_completion.get(), &op.result, len, off);
return boost::asio::async_initiate<CompletionToken, Signature>(
[] (auto handler, auto ex, IoCtx& io, const std::string& oid,
const bufferlist &bl, size_t len, uint64_t off) {
- auto p = Op::create(ex, std::move(handler));
+ constexpr bool is_read = false;
+ auto p = Op::create(ex, is_read, std::move(handler));
auto& op = p->user_data;
int ret = io.aio_write(oid, op.aio_completion.get(), bl, len, off);
return boost::asio::async_initiate<CompletionToken, Signature>(
[] (auto handler, auto ex, IoCtx& io, const std::string& oid,
ObjectReadOperation *read_op, int flags) {
- auto p = Op::create(ex, std::move(handler));
+ constexpr bool is_read = true;
+ auto p = Op::create(ex, is_read, std::move(handler));
auto& op = p->user_data;
int ret = io.aio_operate(oid, op.aio_completion.get(), read_op,
[] (auto handler, auto ex, IoCtx& io, const std::string& oid,
ObjectWriteOperation *write_op, int flags,
const jspan_context* trace_ctx) {
- auto p = Op::create(ex, std::move(handler));
+ constexpr bool is_read = false;
+ auto p = Op::create(ex, is_read, std::move(handler));
auto& op = p->user_data;
int ret = io.aio_operate(oid, op.aio_completion.get(), write_op, flags, trace_ctx);
return boost::asio::async_initiate<CompletionToken, Signature>(
[] (auto handler, auto ex, IoCtx& io, const std::string& oid,
bufferlist& bl, uint64_t timeout_ms) {
- auto p = Op::create(ex, std::move(handler));
+ constexpr bool is_read = false;
+ auto p = Op::create(ex, is_read, std::move(handler));
auto& op = p->user_data;
int ret = io.aio_notify(oid, op.aio_completion.get(),
#include <boost/range/begin.hpp>
#include <boost/range/end.hpp>
+#include <boost/asio/bind_cancellation_slot.hpp>
+#include <boost/asio/cancellation_signal.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/asio/spawn.hpp>
#include <boost/asio/use_future.hpp>
+#include <optional>
+
#define dout_subsys ceph_subsys_rados
#define dout_context g_ceph_context
if (eptr) std::rethrow_exception(eptr);
}
+auto capture(std::optional<error_code>& out) {
+ return [&out] (error_code ec, ...) { out = ec; };
+}
+
+auto capture(boost::asio::cancellation_signal& signal,
+ std::optional<error_code>& out) {
+ return boost::asio::bind_cancellation_slot(signal.slot(), capture(out));
+}
+
TEST_F(AsioRados, AsyncReadCallback)
{
boost::asio::io_context service;
service.run();
}
+// FIXME: this crashes on windows with:
+// Thread 1 received signal SIGILL, Illegal instruction.
+#ifndef _WIN32
+
+TEST_F(AsioRados, AsyncReadOperationCancelTerminal)
+{
+ // cancellation tests are racy, so retry if completion beats the cancellation
+ boost::system::error_code ec;
+ int tries = 10;
+ do {
+ boost::asio::io_context service;
+ boost::asio::cancellation_signal signal;
+ std::optional<error_code> result;
+
+ librados::ObjectReadOperation op;
+ op.assert_exists();
+ librados::async_operate(service, io, "noexist", &op, 0, nullptr,
+ capture(signal, result));
+
+ service.poll();
+ EXPECT_FALSE(service.stopped());
+ EXPECT_FALSE(result);
+
+ signal.emit(boost::asio::cancellation_type::terminal);
+
+ service.run();
+ ASSERT_TRUE(result);
+ ec = *result;
+
+ signal.emit(boost::asio::cancellation_type::all); // noop
+ } while (ec == std::errc::no_such_file_or_directory && --tries);
+
+ EXPECT_EQ(ec, boost::asio::error::operation_aborted);
+}
+
+TEST_F(AsioRados, AsyncReadOperationCancelTotal)
+{
+ // cancellation tests are racy, so retry if completion beats the cancellation
+ boost::system::error_code ec;
+ int tries = 10;
+ do {
+ boost::asio::io_context service;
+ boost::asio::cancellation_signal signal;
+ std::optional<error_code> result;
+
+ librados::ObjectReadOperation op;
+ op.assert_exists();
+ librados::async_operate(service, io, "noexist", &op, 0, nullptr,
+ capture(signal, result));
+
+ service.poll();
+ EXPECT_FALSE(service.stopped());
+ EXPECT_FALSE(result);
+
+ signal.emit(boost::asio::cancellation_type::total);
+
+ service.run();
+ ASSERT_TRUE(result);
+ ec = *result;
+
+ signal.emit(boost::asio::cancellation_type::all); // noop
+ } while (ec == std::errc::no_such_file_or_directory && --tries);
+
+ EXPECT_EQ(ec, boost::asio::error::operation_aborted);
+}
+
+TEST_F(AsioRados, AsyncWriteOperationCancelTerminal)
+{
+ // cancellation tests are racy, so retry if completion beats the cancellation
+ boost::system::error_code ec;
+ int tries = 10;
+ do {
+ boost::asio::io_context service;
+ boost::asio::cancellation_signal signal;
+ std::optional<error_code> result;
+
+ librados::ObjectWriteOperation op;
+ op.assert_exists();
+ librados::async_operate(service, io, "noexist", &op, 0, nullptr,
+ capture(signal, result));
+
+ service.poll();
+ EXPECT_FALSE(service.stopped());
+ EXPECT_FALSE(result);
+
+ signal.emit(boost::asio::cancellation_type::terminal);
+
+ service.run();
+ ASSERT_TRUE(result);
+ ec = *result;
+
+ signal.emit(boost::asio::cancellation_type::all); // noop
+ } while (ec == std::errc::no_such_file_or_directory && --tries);
+
+ EXPECT_EQ(ec, boost::asio::error::operation_aborted);
+}
+
+TEST_F(AsioRados, AsyncWriteOperationCancelTotal)
+{
+ boost::asio::io_context service;
+ boost::asio::cancellation_signal signal;
+ std::optional<error_code> ec;
+
+ librados::ObjectWriteOperation op;
+ op.assert_exists();
+ librados::async_operate(service, io, "noexist", &op, 0, nullptr,
+ capture(signal, ec));
+
+ service.poll();
+ EXPECT_FALSE(service.stopped());
+ EXPECT_FALSE(ec);
+
+ // noop, write only supports terminal
+ signal.emit(boost::asio::cancellation_type::total);
+
+ service.run();
+ ASSERT_TRUE(ec);
+ EXPECT_EQ(ec, std::errc::no_such_file_or_directory);
+
+ signal.emit(boost::asio::cancellation_type::all); // noop
+}
+
+#endif // not _WIN32
+
int main(int argc, char **argv)
{
auto args = argv_to_vec(argc, argv);