From 2c6f777f83ba9ea355d148ad5df85959c7ebd22b Mon Sep 17 00:00:00 2001 From: Casey Bodley Date: Thu, 7 Nov 2024 15:36:45 -0500 Subject: [PATCH] rgw/rados: add concurrent io algorithms for sharded data cls/rgw provides the base class CLSRGWConcurrentIO as a swiss army knife for bucket index operations that visit every shard object. while it uses asynchronous librados requests to perform the io, it blocks on a condition variable when waiting for the AioCompletions. for use in coroutines, we need a version of this that suspends instead of blocking. and to support both stackful and stackless coroutines, we want a fully-generic async inferface templated on CompletionToken. while the CLSRGWConcurrentIO algorithm works for all current uses (reads and writes, with/without retries, with/without cleanup), i chose to break this into 3 algorithms with well-defined semantics: 1. reads: to produce a successful result, all shard operations must succeed. so any shard's failure causes the rest to be cancelled or skipped. supports retries for ListBucket (RGWBIAdvanceAndRetryError). 2. writes: even if some shards fail, we still want to visit every shard before returning the error. supports retries for log trimming operations (repeat until ENODATA). 3. revertible writes: similar to reads, requires all shard operations to succeed. on any failure, the side effects of any successful writes must be reverted before returning. only used by IndexInit (any created shards are removed on failure). each algorithm provides a pure virtual base class that must be implemented for each type of operation, similar to how existing operations inherit from CLSRGWConcurrentIO. Signed-off-by: Casey Bodley --- src/rgw/driver/rados/shard_io.h | 735 +++++++++++ src/test/rgw/CMakeLists.txt | 4 + src/test/rgw/test_rgw_shard_io.cc | 1973 +++++++++++++++++++++++++++++ 3 files changed, 2712 insertions(+) create mode 100644 src/rgw/driver/rados/shard_io.h create mode 100644 src/test/rgw/test_rgw_shard_io.cc diff --git a/src/rgw/driver/rados/shard_io.h b/src/rgw/driver/rados/shard_io.h new file mode 100644 index 00000000000..3996e025349 --- /dev/null +++ b/src/rgw/driver/rados/shard_io.h @@ -0,0 +1,735 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab ft=cpp + +/* + * Ceph - scalable distributed file system + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#pragma once + +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "librados/librados_asio.h" +#include "common/dout.h" + +/// Concurrent io algorithms for data sharded across rados objects. +namespace rgwrados::shard_io { + +using boost::system::error_code; + +/// Classify the result of a single shard operation. +enum class Result { Success, Retry, Error }; + +namespace detail { + +struct RevertibleWriteHandler; +struct RevertHandler; +struct WriteHandler; +struct ReadHandler; + +} // namespace detail + +/// Interface for async_writes() that can initiate write operations and +/// their inverse. The latter is used to restore the original state on +/// error or cancellation. +/// +/// \see RadosRevertibleWriter +class RevertibleWriter : public DoutPrefixPipe { + public: + using executor_type = boost::asio::any_io_executor; + executor_type get_executor() const { return ex; } + + RevertibleWriter(const DoutPrefixProvider& dpp, executor_type ex) + : DoutPrefixPipe(dpp), ex(std::move(ex)) {} + + virtual ~RevertibleWriter() {} + + /// Initiate an async write operation for the given shard object. + virtual void write(int shard, const std::string& object, + detail::RevertibleWriteHandler&& handler) = 0; + + /// Classify the result of a completed shard write operation. + virtual Result on_complete(int shard, error_code ec) { + return ec ? Result::Error : Result::Success; + } + + /// Initiate an async operation to revert the side effects of write(). + /// Revert operations do not call on_complete() on completion. + virtual void revert(int shard, const std::string& object, + detail::RevertHandler&& handler) = 0; + + void add_prefix(std::ostream& out) const override { + out << "shard writer: "; + } + + private: + executor_type ex; +}; + +/// Interface for async_writes() that can initiate write operations. +/// +/// \see RadosWriter +class Writer : public DoutPrefixPipe { + public: + using executor_type = boost::asio::any_io_executor; + executor_type get_executor() const { return ex; } + + Writer(const DoutPrefixProvider& dpp, executor_type ex) + : DoutPrefixPipe(dpp), ex(std::move(ex)) {} + + virtual ~Writer() {} + + /// Initiate an async write operation for the given shard object. + virtual void write(int shard, const std::string& object, + detail::WriteHandler&& handler) = 0; + + /// Classify the result of a completed shard write operation. Returning + /// Retry will trigger another write() and its corresponding on_complete(). + virtual Result on_complete(int shard, error_code ec) { + return ec ? Result::Error : Result::Success; + } + + void add_prefix(std::ostream& out) const override { + out << "shard writer: "; + } + + private: + executor_type ex; +}; + +/// Interface for async_reads() that can initiate read operations. +/// +/// \see RadosReader +class Reader : public DoutPrefixPipe { + public: + using executor_type = boost::asio::any_io_executor; + executor_type get_executor() const { return ex; } + + Reader(const DoutPrefixProvider& dpp, executor_type ex) + : DoutPrefixPipe(dpp), ex(std::move(ex)) {} + + virtual ~Reader() {} + + /// Initiate an async read operation for the given shard object. + virtual void read(int shard, const std::string& object, + detail::ReadHandler&& handler) = 0; + + /// Classify the result of a completed shard read operation. Returning + /// Retry will trigger another read() and its corresponding on_complete(). + virtual Result on_complete(int shard, error_code ec) { + return ec ? Result::Error : Result::Success; + } + + void add_prefix(std::ostream& out) const override { + out << "shard reader: "; + } + + private: + executor_type ex; +}; + +namespace detail { + +struct Shard : boost::intrusive::list_base_hook<> { + boost::asio::cancellation_signal signal; + std::map::const_iterator iter; + int id() const { return iter->first; } + const std::string& object() const { return iter->second; } +}; +using ShardList = boost::intrusive::list; + +inline auto make_shards(const std::map& objects) + -> std::vector +{ + // allocate the shards and assign an 'objects' iterator to each + auto shards = std::vector(objects.size()); + auto s = shards.begin(); + for (auto o = objects.begin(); o != objects.end(); ++o, ++s) { + s->iter = o; + } + return shards; +} + +// handler that wakes the caller of async_wait() +using WaitHandler = boost::asio::any_completion_handler; + +// cancellation handler that forwards signals to all outstanding requests +struct WaitCancellation { + ShardList& sending; + ShardList& outstanding; + ShardList* completed; + error_code& failure; + bool& terminal; + + WaitCancellation(ShardList& sending, ShardList& outstanding, + ShardList* completed, error_code& failure, bool& terminal) + : sending(sending), outstanding(outstanding), completed(completed), + failure(failure), terminal(terminal) {} + + void operator()(boost::asio::cancellation_type type) { + if (type != boost::asio::cancellation_type::none) { + if (!!(type & boost::asio::cancellation_type::terminal)) { + terminal = true; + } + if (!failure) { + failure = make_error_code(boost::asio::error::operation_aborted); + if (completed) { + // for RevertibleWriter, schedule completed shards for reverts + sending = std::move(*completed); + } + } + auto i = outstanding.begin(); + while (i != outstanding.end()) { + // emit() may dispatch the completion that removes i from outstanding + auto& shard = *i++; + shard.signal.emit(type); + } + } + } +}; + +// suspend the calling coroutine and capture a WaitHandler to resume it +template +auto async_wait(WaitHandler& wakeup, ShardList& sending, ShardList& outstanding, + ShardList* completed, error_code& failure, bool& terminal, + CompletionToken&& token) +{ + return boost::asio::async_initiate( + [&, completed] (auto handler) { + wakeup = std::move(handler); + auto slot = boost::asio::get_associated_cancellation_slot(wakeup); + if (slot.is_connected()) { + slot.template emplace( + sending, outstanding, completed, failure, terminal); + } + }, token); +} + +inline void maybe_complete(WaitHandler& wakeup, ShardList& sending, + ShardList& outstanding, bool terminal) +{ + const bool ready_to_send = !sending.empty() && !terminal; + const bool done_waiting = outstanding.empty(); + if (wakeup && (ready_to_send || done_waiting)) { + auto slot = boost::asio::get_associated_cancellation_slot(wakeup); + slot.clear(); // remove async_wait()'s cancellation handler + boost::asio::dispatch(std::move(wakeup)); + } +} + +struct RevertibleWriteHandler { + RevertibleWriter& writer; + ShardList& sending; + ShardList& outstanding; + ShardList& completed; + WaitHandler& waiter; + error_code& failure; + bool& terminal; + Shard& shard; + + using executor_type = boost::asio::any_io_executor; + executor_type get_executor() const { return writer.get_executor(); } + + using cancellation_slot_type = boost::asio::cancellation_slot; + cancellation_slot_type get_cancellation_slot() const noexcept { + return shard.signal.slot(); + } + + void operator()(error_code ec, version_t = {}) { + outstanding.erase(outstanding.iterator_to(shard)); + + const auto result = writer.on_complete(shard.id(), ec); + switch (result) { + case Result::Retry: + ldpp_dout(&writer, 20) << "write on '" << shard.object() + << "' needs retry: " << ec.message() << dendl; + if (!failure) { + // reschedule the shard for sending + sending.push_back(shard); + } + break; + case Result::Success: + if (!failure) { + // track as 'completed' in case reverts are necessary + completed.push_back(shard); + } else if (!terminal) { + // add directly to 'sending' for revert + sending.push_back(shard); + } + break; + case Result::Error: + ldpp_dout(&writer, 4) << "write on '" << shard.object() + << "' failed: " << ec.message() << dendl; + if (!failure) { + failure = ec; + // schedule completed shards for reverts + sending = std::move(completed); + } + break; + } + + maybe_complete(waiter, sending, outstanding, terminal); + } +}; // struct RevertibleWriteHandler + +struct RevertHandler { + RevertibleWriter& writer; + ShardList& sending; + ShardList& outstanding; + WaitHandler& waiter; + bool& terminal; + Shard& shard; + + using executor_type = boost::asio::any_io_executor; + executor_type get_executor() const { return writer.get_executor(); } + + using cancellation_slot_type = boost::asio::cancellation_slot; + cancellation_slot_type get_cancellation_slot() const noexcept { + return shard.signal.slot(); + } + + void operator()(error_code ec, version_t = {}) { + outstanding.erase(outstanding.iterator_to(shard)); + + if (ec) { + ldpp_dout(&writer, 4) << "revert on '" << shard.object() + << "' failed: " << ec.message() << dendl; + } + + maybe_complete(waiter, sending, outstanding, terminal); + } +}; // struct RevertHandler + +} // namespace detail + +/// Try to apply concurrent write operations to the shard objects. Upon error +/// or partial/total cancellation, revert any operations that succeeded. +template CompletionToken> +auto async_writes(RevertibleWriter& writer, + const std::map& objects, + size_t max_concurrent, + CompletionToken&& token) +{ + return boost::asio::async_initiate( + boost::asio::co_composed( + [] (auto state, RevertibleWriter& writer, + const std::map& objects, + size_t max_concurrent) -> void + { + state.reset_cancellation_state( + boost::asio::enable_total_cancellation()); + + // initialize the shards array and schedule each for sending + auto shards = detail::make_shards(objects); + detail::ShardList sending{shards.begin(), shards.end()}; + detail::ShardList outstanding; + detail::ShardList completed; + detail::WaitHandler waiter; + error_code failure; + bool terminal = false; + + for (;;) { + while (!sending.empty() && !terminal) { + if (outstanding.size() >= max_concurrent) { + // wait for the next completion before sending another + co_await detail::async_wait( + waiter, sending, outstanding, &completed, + failure, terminal, boost::asio::deferred); + + if (!!state.cancelled()) { + // once partial/total cancellation is requested, only + // terminal cancellation can stop the reverts + state.reset_cancellation_state( + boost::asio::enable_terminal_cancellation()); + } + continue; // recheck loop conditions + } + + // prepare and send the next shard object operation + detail::Shard& shard = sending.front(); + sending.pop_front(); + outstanding.push_back(shard); + + if (failure) { + writer.revert(shard.id(), shard.object(), + detail::RevertHandler{ + writer, sending, outstanding, + waiter, terminal, shard}); + } else { + writer.write(shard.id(), shard.object(), + detail::RevertibleWriteHandler{ + writer, sending, outstanding, completed, + waiter, failure, terminal, shard}); + } + } // while (!sending.empty() && !terminal) + + if (outstanding.empty()) { + // nothing left to send or receive, we're done + co_return failure; + } + + // wait for outstanding completions + co_await detail::async_wait( + waiter, sending, outstanding, &completed, + failure, terminal, boost::asio::deferred); + + if (!!state.cancelled()) { + state.reset_cancellation_state( + boost::asio::enable_terminal_cancellation()); + } + } // for (;;) + // unreachable + }, writer), + token, std::ref(writer), objects, max_concurrent); +} // async_writes(RevertibleWriter) + +namespace detail { + +struct WriteHandler { + Writer& writer; + ShardList& sending; + ShardList& outstanding; + WaitHandler& waiter; + error_code& failure; + bool& terminal; + Shard& shard; + + using executor_type = boost::asio::any_io_executor; + executor_type get_executor() const { return writer.get_executor(); } + + using cancellation_slot_type = boost::asio::cancellation_slot; + cancellation_slot_type get_cancellation_slot() const noexcept { + return shard.signal.slot(); + } + + void operator()(error_code ec, version_t = {}) { + outstanding.erase(outstanding.iterator_to(shard)); + + const auto result = writer.on_complete(shard.id(), ec); + switch (result) { + case Result::Retry: + ldpp_dout(&writer, 20) << "write on '" << shard.object() + << "' needs retry: " << ec.message() << dendl; + // reschedule the shard for sending + sending.push_back(shard); + break; + case Result::Success: + break; + case Result::Error: + ldpp_dout(&writer, 4) << "write on '" << shard.object() + << "' failed: " << ec.message() << dendl; + if (!failure) { + failure = ec; + } + break; + } + + maybe_complete(waiter, sending, outstanding, terminal); + } +}; // struct WriteHandler + +} // namespace detail + +/// Make an effort to apply a write operation to all shard objects. On error, +/// finish the operations and retries on other shards before returning. +template CompletionToken> +auto async_writes(Writer& writer, + const std::map& objects, + size_t max_concurrent, + CompletionToken&& token) +{ + return boost::asio::async_initiate( + boost::asio::co_composed( + [] (auto state, Writer& writer, + const std::map& objects, + size_t max_concurrent) -> void + { + state.reset_cancellation_state( + boost::asio::enable_terminal_cancellation()); + + // initialize the shards array and schedule each for sending + auto shards = detail::make_shards(objects); + detail::ShardList sending{shards.begin(), shards.end()}; + detail::ShardList outstanding; + detail::WaitHandler waiter; + error_code failure; + bool terminal = false; + + for (;;) { + while (!sending.empty() && !terminal) { + if (outstanding.size() >= max_concurrent) { + // wait for the next completion before sending another + co_await detail::async_wait( + waiter, sending, outstanding, nullptr, + failure, terminal, boost::asio::deferred); + continue; // recheck loop conditions + } + + // prepare and send the next shard object operation + detail::Shard& shard = sending.front(); + sending.pop_front(); + outstanding.push_back(shard); + + writer.write(shard.id(), shard.object(), + detail::WriteHandler{writer, sending, outstanding, + waiter, failure, terminal, shard}); + } // while (!sending.empty() && !terminal) + + if (outstanding.empty()) { + // nothing left to send or receive, we're done + co_return failure; + } + + // await the next completion (it may schedule a retry) + co_await detail::async_wait( + waiter, sending, outstanding, nullptr, + failure, terminal, boost::asio::deferred); + } // for (;;) + // unreachable + }, writer), + token, std::ref(writer), objects, max_concurrent); +} // async_writes(Writer) + +namespace detail { + +struct ReadHandler { + Reader& reader; + ShardList& sending; + ShardList& outstanding; + WaitHandler& waiter; + error_code& failure; + bool& terminal; + Shard& shard; + + using executor_type = boost::asio::any_io_executor; + executor_type get_executor() const { return reader.get_executor(); } + + using cancellation_slot_type = boost::asio::cancellation_slot; + cancellation_slot_type get_cancellation_slot() const noexcept { + return shard.signal.slot(); + } + + void operator()(error_code ec, version_t = {}, bufferlist = {}) { + outstanding.erase(outstanding.iterator_to(shard)); + + bool need_cancel = false; + + const auto result = reader.on_complete(shard.id(), ec); + switch (result) { + case Result::Retry: + // reschedule the shard for sending + sending.push_back(shard); + ldpp_dout(&reader, 20) << "read on '" << shard.object() + << "' needs retry: " << ec.message() << dendl; + break; + case Result::Success: + break; + case Result::Error: + ldpp_dout(&reader, 4) << "read on '" << shard.object() + << "' failed: " << ec.message() << dendl; + if (!failure) { + failure = ec; + // trigger cancellations after our call to maybe_complete(). one of + // the cancellations may trigger completion and cause async_reads() + // to co_return. our call to maybe_complete() would then access + // variables that were destroyed with async_reads()'s stack + need_cancel = !outstanding.empty(); + } + break; + } + + maybe_complete(waiter, sending, outstanding, terminal); + + if (need_cancel) { + // cancel outstanding requests + auto i = outstanding.begin(); + while (i != outstanding.end()) { + // emit() may recurse and remove i + auto& s = *i++; + s.signal.emit(boost::asio::cancellation_type::terminal); + } + } + } +}; // struct ReadHandler + +} // namespace detail + +/// Issue concurrent read operations to all shard objects, aborting on error. +template CompletionToken> +auto async_reads(Reader& reader, + const std::map& objects, + size_t max_concurrent, + CompletionToken&& token) +{ + return boost::asio::async_initiate( + boost::asio::co_composed( + [] (auto state, Reader& reader, + const std::map& objects, + size_t max_concurrent) -> void + { + state.reset_cancellation_state( + boost::asio::enable_terminal_cancellation()); + + // initialize the shards array and schedule each for sending + auto shards = detail::make_shards(objects); + detail::ShardList sending{shards.begin(), shards.end()}; + detail::ShardList outstanding; + detail::WaitHandler waiter; + error_code failure; + bool terminal = false; + + for (;;) { + while (!sending.empty() && !failure) { + if (outstanding.size() >= max_concurrent) { + // wait for the next completion before sending another + co_await detail::async_wait( + waiter, sending, outstanding, nullptr, + failure, terminal, boost::asio::deferred); + continue; // recheck loop conditions + } + + // prepare and send the next shard object operation + detail::Shard& shard = sending.front(); + sending.pop_front(); + outstanding.push_back(shard); + + reader.read(shard.id(), shard.object(), + detail::ReadHandler{reader, sending, outstanding, + waiter, failure, terminal, shard}); + } // while (!sending.empty() && !failure) + + if (outstanding.empty()) { + // nothing left to send or receive, we're done + co_return failure; + } + + // await the next completion (it may schedule a retry) + co_await detail::async_wait( + waiter, sending, outstanding, nullptr, + failure, terminal, boost::asio::deferred); + } // for (;;) + // unreachable + }, reader), + token, std::ref(reader), objects, max_concurrent); +} // async_reads(Reader) + + +/// Interface for async_writes() that can prepare rados write operations +/// and their inverse. +class RadosRevertibleWriter : public RevertibleWriter { + public: + RadosRevertibleWriter(const DoutPrefixProvider& dpp, executor_type ex, + librados::IoCtx& ioctx) + : RevertibleWriter(dpp, std::move(ex)), ioctx(ioctx) + {} + + /// Prepare a rados write operation for the given shard. + virtual void prepare_write(int shard, librados::ObjectWriteOperation& op) = 0; + + /// Prepare a rados operation to revert the side effects of prepare_write(). + /// Revert operations do not call on_complete() on completion. + virtual void prepare_revert(int shard, librados::ObjectWriteOperation& op) = 0; + + private: + // implement write() and revert() in terms of prepare_write(), + // prepare_revert() and librados::async_operate() + void write(int shard, const std::string& object, + detail::RevertibleWriteHandler&& handler) final { + librados::ObjectWriteOperation op; + prepare_write(shard, op); + + constexpr int flags = 0; + constexpr jspan_context* trace = nullptr; + librados::async_operate(get_executor(), ioctx, object, std::move(op), + flags, trace, std::move(handler)); + } + + void revert(int shard, const std::string& object, + detail::RevertHandler&& handler) final { + librados::ObjectWriteOperation op; + prepare_revert(shard, op); + + constexpr int flags = 0; + constexpr jspan_context* trace = nullptr; + librados::async_operate(get_executor(), ioctx, object, std::move(op), + flags, trace, std::move(handler)); + } + + librados::IoCtx& ioctx; +}; + +/// Interface for async_writes() that can prepare rados write operations. +class RadosWriter : public Writer { + public: + RadosWriter(const DoutPrefixProvider& dpp, executor_type ex, + librados::IoCtx& ioctx) + : Writer(dpp, std::move(ex)), ioctx(ioctx) + {} + + /// Prepare a rados write operation for the given shard. + virtual void prepare_write(int shard, librados::ObjectWriteOperation& op) = 0; + + private: + // implement write() in terms of prepare_write() and librados::async_operate() + void write(int shard, const std::string& object, + detail::WriteHandler&& handler) final { + librados::ObjectWriteOperation op; + prepare_write(shard, op); + + constexpr int flags = 0; + constexpr jspan_context* trace = nullptr; + librados::async_operate(get_executor(), ioctx, object, std::move(op), + flags, trace, std::move(handler)); + } + + librados::IoCtx& ioctx; +}; + +/// Interface for async_reads() that can prepare rados read operations. +class RadosReader : public Reader { + public: + RadosReader(const DoutPrefixProvider& dpp, executor_type ex, + librados::IoCtx& ioctx) + : Reader(dpp, std::move(ex)), ioctx(ioctx) + {} + + /// Prepare a rados read operation for the given shard. + virtual void prepare_read(int shard, librados::ObjectReadOperation& op) = 0; + + private: + // implement read() in terms of prepare_read() and librados::async_operate() + void read(int shard, const std::string& object, + detail::ReadHandler&& handler) final { + librados::ObjectReadOperation op; + prepare_read(shard, op); + + constexpr int flags = 0; + constexpr jspan_context* trace = nullptr; + librados::async_operate(get_executor(), ioctx, object, std::move(op), + flags, trace, std::move(handler)); + } + + librados::IoCtx& ioctx; +}; + +} // namespace rgwrados::shard_io diff --git a/src/test/rgw/CMakeLists.txt b/src/test/rgw/CMakeLists.txt index cf4252c90a1..f98fedd36c0 100644 --- a/src/test/rgw/CMakeLists.txt +++ b/src/test/rgw/CMakeLists.txt @@ -303,6 +303,10 @@ target_include_directories(ceph_test_rgw_gc_log target_link_libraries(ceph_test_rgw_gc_log ${rgw_libs} radostest-cxx) install(TARGETS ceph_test_rgw_gc_log DESTINATION ${CMAKE_INSTALL_BINDIR}) +add_executable(unittest_rgw_shard_io test_rgw_shard_io.cc) +add_ceph_unittest(unittest_rgw_shard_io) +target_link_libraries(unittest_rgw_shard_io ${rgw_libs} unit-main ${UNITTEST_LIBS}) + add_ceph_test(test-ceph-diff-sorted.sh ${CMAKE_CURRENT_SOURCE_DIR}/test-ceph-diff-sorted.sh) diff --git a/src/test/rgw/test_rgw_shard_io.cc b/src/test/rgw/test_rgw_shard_io.cc new file mode 100644 index 00000000000..b0b63980e37 --- /dev/null +++ b/src/test/rgw/test_rgw_shard_io.cc @@ -0,0 +1,1973 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + */ + +#include "driver/rados/shard_io.h" + +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +#include "global/global_context.h" + +#define dout_subsys ceph_subsys_rgw +#define dout_context g_ceph_context + +using boost::system::error_code; +using boost::system::errc::io_error; +using boost::system::errc::no_such_file_or_directory; +using boost::system::errc::operation_canceled; +using boost::system::errc::resource_unavailable_try_again; + +constexpr size_t infinite_aio = std::numeric_limits::max(); + +template +auto capture(std::optional& opt) +{ + return [&opt] (T value) { opt = std::move(value); }; +} + +template +auto capture(boost::asio::cancellation_signal& signal, std::optional& opt) +{ + return boost::asio::bind_cancellation_slot(signal.slot(), capture(opt)); +} + +// handler wrapper that removes itself from a list on cancellation +template +struct MockHandler : + boost::intrusive::list_base_hook< + boost::intrusive::link_mode< + boost::intrusive::auto_unlink>> { + Handler handler; + + struct Cancel { + MockHandler* self; + explicit Cancel(MockHandler* self) : self(self) {} + + void operator()(boost::asio::cancellation_type type) { + if (!!(type & boost::asio::cancellation_type::terminal)) { + auto tmp = std::move(self->handler); + delete self; // auto unlink + auto ec = make_error_code(operation_canceled); + boost::asio::dispatch(boost::asio::append(std::move(tmp), ec)); + } + } + }; + + MockHandler(Handler&& h) : handler(std::move(h)) { + auto slot = boost::asio::get_associated_cancellation_slot(handler); + if (slot.is_connected()) { + slot.template emplace(this); + } + } + + void operator()(error_code ec) { + auto slot = boost::asio::get_associated_cancellation_slot(handler); + slot.clear(); + std::move(handler)(ec); + } + + const Handler* operator->() const { return &handler; } +}; + +namespace boost::asio { + +// forward wrapped handler's associations +template