#pragma once
#include <exception>
+#include <mutex>
#include <optional>
#include <boost/asio/append.hpp>
#include <boost/asio/associated_cancellation_slot.hpp>
namespace ceph::async {
+namespace detail {
+
+// handler wrapper that reacquires a lock immediately before completion
+template <typename Handler, typename BasicLockable>
+struct lock_handler {
+ Handler handler;
+ BasicLockable* lock = nullptr;
+
+ template <typename ...Args>
+ void operator()(Args&& ...args) {
+ if (lock) {
+ lock->lock();
+ }
+ std::move(handler)(std::forward<Args>(args)...);
+ }
+};
+
+// deduction guide required by windows?
+template <typename Handler, typename BasicLockable>
+lock_handler(Handler&&, BasicLockable*) -> lock_handler<Handler, BasicLockable>;
+
+} // namespace detail
+
/// Captures a yield_context handler for deferred completion or cancellation.
template <typename Ret>
class yield_waiter {
if (slot.is_connected()) {
slot.template emplace<op_cancellation>(this);
}
- state.emplace(std::move(h));
+ constexpr std::unique_lock<std::mutex>* lock = nullptr;
+ state.emplace(std::move(h), lock);
+ }, token);
+ }
+
+ /// Suspends the given yield_context until the captured handler is invoked
+ /// via complete() or cancel(). The given lock is released immediately before
+ /// it suspends and reacquired immediately after it resumes.
+ template <typename CompletionToken>
+ auto async_wait(std::unique_lock<std::mutex>& lock, CompletionToken&& token)
+ {
+ return boost::asio::async_initiate<CompletionToken, Signature>(
+ [this, &lock] (handler_type h) {
+ auto slot = get_associated_cancellation_slot(h);
+ if (slot.is_connected()) {
+ slot.template emplace<op_cancellation>(this);
+ }
+ state.emplace(std::move(h), &lock);
+ lock.unlock(); // unlock before suspend
}, token);
}
{
auto s = std::move(*state);
state.reset();
- auto h = boost::asio::append(std::move(s.handler), ec, std::move(value));
- boost::asio::dispatch(std::move(h));
+ boost::asio::dispatch(
+ boost::asio::append(
+ detail::lock_handler{std::move(s.handler), s.lock},
+ ec, std::move(value)));
}
/// Destroy the completion handler.
struct handler_state {
handler_type handler;
work_guard work;
+ std::unique_lock<std::mutex>* lock = nullptr;
- explicit handler_state(handler_type&& h)
- : handler(std::move(h)),
- work(make_work_guard(handler))
+ handler_state(handler_type&& h, std::unique_lock<std::mutex>* lock)
+ : handler(std::move(h)), work(make_work_guard(handler)), lock(lock)
{}
};
std::optional<handler_state> state;
if (slot.is_connected()) {
slot.template emplace<op_cancellation>(this);
}
- state.emplace(std::move(h));
+ constexpr std::unique_lock<std::mutex>* lock = nullptr;
+ state.emplace(std::move(h), lock);
+ }, token);
+ }
+
+ /// Suspends the given yield_context until the captured handler is invoked
+ /// via complete() or cancel(). The given lock is released immediately before
+ /// it suspends and reacquired immediately after it resumes.
+ template <typename CompletionToken>
+ auto async_wait(std::unique_lock<std::mutex>& lock, CompletionToken&& token)
+ {
+ return boost::asio::async_initiate<CompletionToken, Signature>(
+ [this, &lock] (handler_type h) {
+ auto slot = get_associated_cancellation_slot(h);
+ if (slot.is_connected()) {
+ slot.template emplace<op_cancellation>(this);
+ }
+ state.emplace(std::move(h), &lock);
+ lock.unlock(); // unlock before suspend
}, token);
}
{
auto s = std::move(*state);
state.reset();
- boost::asio::dispatch(boost::asio::append(std::move(s.handler), ec));
+ boost::asio::dispatch(
+ boost::asio::append(
+ detail::lock_handler{std::move(s.handler), s.lock}, ec));
}
/// Destroy the completion handler.
struct handler_state {
handler_type handler;
work_guard work;
+ std::unique_lock<std::mutex>* lock = nullptr;
- explicit handler_state(handler_type&& h)
- : handler(std::move(h)),
- work(make_work_guard(handler))
+ handler_state(handler_type&& h, std::unique_lock<std::mutex>* lock)
+ : handler(std::move(h)), work(make_work_guard(handler)), lock(lock)
{}
};
std::optional<handler_state> state;
};
} // namespace ceph::async
+
+namespace boost::asio {
+
+// forward the handler's associated executor, allocator, cancellation slot, etc
+template <template <typename, typename> class Associator,
+ typename Handler, typename BasicLockable, typename DefaultCandidate>
+struct associator<Associator,
+ ceph::async::detail::lock_handler<Handler, BasicLockable>, DefaultCandidate>
+ : Associator<Handler, DefaultCandidate>
+{
+ static auto get(const ceph::async::detail::lock_handler<Handler, BasicLockable>& h) noexcept {
+ return Associator<Handler, DefaultCandidate>::get(h.handler);
+ }
+ static auto get(const ceph::async::detail::lock_handler<Handler, BasicLockable>& h,
+ const DefaultCandidate& c) noexcept {
+ return Associator<Handler, DefaultCandidate>::get(h.handler, c);
+ }
+};
+
+} // namespace boost::asio
}
}
+TEST(YieldWaiterVoid, wait_locked_complete)
+{
+ asio::io_context ctx;
+ yield_waiter<void> waiter;
+ std::optional<std::exception_ptr> eptr;
+
+ std::mutex mutex;
+ auto lock = std::unique_lock{mutex};
+
+ asio::spawn(ctx, [&waiter, &lock] (asio::yield_context yield) {
+ ASSERT_TRUE(lock.owns_lock());
+ waiter.async_wait(lock, yield);
+ EXPECT_TRUE(lock.owns_lock());
+ }, capture(eptr));
+
+ ctx.poll();
+ ASSERT_FALSE(ctx.stopped());
+
+ ASSERT_FALSE(lock.owns_lock()); // unlocked while suspended
+ ASSERT_TRUE(waiter);
+ waiter.complete(error_code{});
+ EXPECT_FALSE(waiter);
+
+ ctx.poll();
+ ASSERT_TRUE(ctx.stopped());
+ ASSERT_TRUE(lock.owns_lock());
+}
+
+TEST(YieldWaiterVoid, wait_locked_error)
+{
+ asio::io_context ctx;
+ yield_waiter<void> waiter;
+ std::optional<std::exception_ptr> eptr;
+
+ std::mutex mutex;
+ auto lock = std::unique_lock{mutex};
+
+ asio::spawn(ctx, [&waiter, &lock] (asio::yield_context yield) {
+ ASSERT_TRUE(lock.owns_lock());
+ waiter.async_wait(lock, yield);
+ EXPECT_TRUE(lock.owns_lock());
+ }, capture(eptr));
+
+ ctx.poll();
+ ASSERT_FALSE(ctx.stopped());
+
+ ASSERT_FALSE(lock.owns_lock()); // unlocked while suspended
+ ASSERT_TRUE(waiter);
+ waiter.complete(make_error_code(asio::error::operation_aborted));
+ EXPECT_FALSE(waiter);
+
+ ctx.poll();
+ ASSERT_TRUE(ctx.stopped());
+ ASSERT_TRUE(lock.owns_lock());
+ ASSERT_TRUE(eptr);
+ ASSERT_TRUE(*eptr);
+ try {
+ std::rethrow_exception(*eptr);
+ } catch (const boost::system::system_error& e) {
+ EXPECT_EQ(e.code(), asio::error::operation_aborted);
+ } catch (const std::exception&) {
+ EXPECT_THROW(throw, boost::system::system_error);
+ }
+}
+
TEST(YieldWaiterInt, wait_shutdown)
{
}
}
+TEST(YieldWaiterPtr, wait_locked_complete)
+{
+ asio::io_context ctx;
+ yield_waiter<std::unique_ptr<int>> waiter;
+ std::optional<std::unique_ptr<int>> result;
+
+ std::mutex mutex;
+ auto lock = std::unique_lock{mutex};
+
+ asio::spawn(ctx, [&waiter, &lock, &result] (asio::yield_context yield) {
+ ASSERT_TRUE(lock.owns_lock());
+ result = waiter.async_wait(lock, yield);
+ EXPECT_TRUE(lock.owns_lock());
+ }, rethrow);
+
+ ctx.poll();
+ ASSERT_FALSE(ctx.stopped());
+ ASSERT_FALSE(lock.owns_lock()); // unlocked while suspended
+
+ ASSERT_TRUE(waiter);
+ waiter.complete(error_code{}, std::make_unique<int>(42));
+ EXPECT_FALSE(waiter);
+
+ ctx.poll();
+ EXPECT_TRUE(ctx.stopped());
+ ASSERT_TRUE(lock.owns_lock());
+ ASSERT_TRUE(result);
+ ASSERT_TRUE(*result);
+ EXPECT_EQ(42, **result);
+}
+
+TEST(YieldWaiterPtr, wait_locked_error)
+{
+ asio::io_context ctx;
+ yield_waiter<std::unique_ptr<int>> waiter;
+ std::optional<std::unique_ptr<int>> result;
+ std::optional<std::exception_ptr> eptr;
+
+ std::mutex mutex;
+ auto lock = std::unique_lock{mutex};
+
+ asio::spawn(ctx, [&waiter, &lock, &result] (asio::yield_context yield) {
+ ASSERT_TRUE(lock.owns_lock());
+ result = waiter.async_wait(lock, yield);
+ EXPECT_TRUE(lock.owns_lock());
+ }, capture(eptr));
+
+ ctx.poll();
+ ASSERT_FALSE(ctx.stopped());
+ EXPECT_FALSE(lock.owns_lock()); // unlocked while suspended
+
+ ASSERT_TRUE(waiter);
+ waiter.complete(make_error_code(std::errc::no_such_file_or_directory), nullptr);
+ EXPECT_FALSE(waiter);
+
+ ctx.poll();
+ ASSERT_TRUE(ctx.stopped());
+ ASSERT_TRUE(lock.owns_lock());
+ EXPECT_FALSE(result);
+ ASSERT_TRUE(eptr);
+ ASSERT_TRUE(*eptr);
+ try {
+ std::rethrow_exception(*eptr);
+ } catch (const boost::system::system_error& e) {
+ EXPECT_EQ(e.code(), std::errc::no_such_file_or_directory);
+ } catch (const std::exception&) {
+ EXPECT_THROW(throw, boost::system::system_error);
+ }
+}
+
void invoke_callback(int expected_reply, std::function<void(int)> cb) {
auto t = std::thread([cb, expected_reply] {
cb(expected_reply);