--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2020 Red Hat
+ * Author: Adam C. Emerson <aemerson@redhat.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef CEPH_COMMON_ASYNC_BLOCKED_COMPLETION_H
+#define CEPH_COMMON_ASYNC_BLOCKED_COMPLETION_H
+
+#include <atomic>
+#include <condition_variable>
+#include <mutex>
+#include <optional>
+#include <type_traits>
+
+#include <boost/asio/async_result.hpp>
+
+#include <boost/system/error_code.hpp>
+#include <boost/system/system_error.hpp>
+
+namespace ceph::async {
+
+namespace bs = boost::system;
+
+class use_blocked_t {
+ use_blocked_t(bs::error_code* ec) : ec(ec) {}
+public:
+ use_blocked_t() = default;
+
+ use_blocked_t operator [](bs::error_code& _ec) const {
+ return use_blocked_t(&_ec);
+ }
+
+ bs::error_code* ec = nullptr;
+};
+
+inline constexpr use_blocked_t use_blocked;
+
+namespace detail {
+
+template<typename... Ts>
+struct blocked_handler
+{
+ blocked_handler(use_blocked_t b) noexcept : ec(b.ec) {}
+
+ void operator ()(Ts... values) noexcept {
+ std::scoped_lock l(*m);
+ *ec = bs::error_code{};
+ *value = std::forward_as_tuple(std::move(values)...);
+ *done = true;
+ cv->notify_one();
+ }
+
+ void operator ()(bs::error_code ec, Ts... values) noexcept {
+ std::scoped_lock l(*m);
+ *this->ec = ec;
+ *value = std::forward_as_tuple(std::move(values)...);
+ *done = true;
+ cv->notify_one();
+ }
+
+ bs::error_code* ec;
+ std::optional<std::tuple<Ts...>>* value = nullptr;
+ std::mutex* m = nullptr;
+ std::condition_variable* cv = nullptr;
+ bool* done = nullptr;
+};
+
+template<typename T>
+struct blocked_handler<T>
+{
+ blocked_handler(use_blocked_t b) noexcept : ec(b.ec) {}
+
+ void operator ()(T value) noexcept {
+ std::scoped_lock l(*m);
+ *ec = bs::error_code();
+ *this->value = std::move(value);
+ *done = true;
+ cv->notify_one();
+ }
+
+ void operator ()(bs::error_code ec, T value) noexcept {
+ std::scoped_lock l(*m);
+ *this->ec = ec;
+ *this->value = std::move(value);
+ *done = true;
+ cv->notify_one();
+ }
+
+ //private:
+ bs::error_code* ec;
+ std::optional<T>* value;
+ std::mutex* m = nullptr;
+ std::condition_variable* cv = nullptr;
+ bool* done = nullptr;
+};
+
+template<>
+struct blocked_handler<void>
+{
+ blocked_handler(use_blocked_t b) noexcept : ec(b.ec) {}
+
+ void operator ()() noexcept {
+ std::scoped_lock l(*m);
+ *ec = bs::error_code{};
+ *done = true;
+ cv->notify_one();
+ }
+
+ void operator ()(bs::error_code ec) noexcept {
+ std::scoped_lock l(*m);
+ *this->ec = ec;
+ *done = true;
+ cv->notify_one();
+ }
+
+ bs::error_code* ec;
+ std::mutex* m = nullptr;
+ std::condition_variable* cv = nullptr;
+ bool* done = nullptr;
+};
+
+
+template<typename... Ts>
+class blocked_result
+{
+public:
+ using completion_handler_type = blocked_handler<Ts...>;
+ using return_type = std::tuple<Ts...>;
+
+ explicit blocked_result(completion_handler_type& h) noexcept {
+ out_ec = h.ec;
+ if (!out_ec) h.ec = &ec;
+ h.value = &value;
+ h.m = &m;
+ h.cv = &cv;
+ h.done = &done;
+ }
+
+ return_type get() {
+ std::unique_lock l(m);
+ cv.wait(l, [this]() { return done; });
+ if (!out_ec && ec) throw bs::system_error(ec);
+ return std::move(*value);
+ }
+
+private:
+ bs::error_code* out_ec;
+ bs::error_code ec;
+ std::optional<return_type> value;
+ std::mutex m;
+ std::condition_variable cv;
+ bool done = false;
+};
+
+template<typename T>
+class blocked_result<T>
+{
+public:
+ using completion_handler_type = blocked_handler<T>;
+ using return_type = T;
+
+ explicit blocked_result(completion_handler_type& h) noexcept {
+ out_ec = h.ec;
+ if (!out_ec) h.ec = &ec;
+ h.value = &value;
+ h.m = &m;
+ h.cv = &cv;
+ h.done = &done;
+ }
+
+ return_type get() {
+ std::unique_lock l(m);
+ cv.wait(l, [this]() { return done; });
+ if (!out_ec && ec) throw bs::system_error(ec);
+ return std::move(*value);
+ }
+
+private:
+ bs::error_code* out_ec;
+ bs::error_code ec;
+ std::optional<return_type> value;
+ std::mutex m;
+ std::condition_variable cv;
+ bool done = false;
+};
+
+template<>
+class blocked_result<void>
+{
+public:
+ using completion_handler_type = blocked_handler<void>;
+ using return_type = void;
+
+ explicit blocked_result(completion_handler_type& h) noexcept {
+ out_ec = h.ec;
+ if (!out_ec) h.ec = &ec;
+ h.m = &m;
+ h.cv = &cv;
+ h.done = &done;
+ }
+
+ void get() {
+ std::unique_lock l(m);
+ cv.wait(l, [this]() { return done; });
+ if (!out_ec && ec) throw bs::system_error(ec);
+ }
+
+private:
+ bs::error_code* out_ec;
+ bs::error_code ec;
+ std::mutex m;
+ std::condition_variable cv;
+ bool done = false;
+};
+} // namespace detail
+} // namespace ceph::async
+
+
+namespace boost::asio {
+template<typename ReturnType>
+class async_result<ceph::async::use_blocked_t, ReturnType()>
+ : public ceph::async::detail::blocked_result<void>
+{
+public:
+ explicit async_result(typename ceph::async::detail::blocked_result<void>
+ ::completion_handler_type& h)
+ : ceph::async::detail::blocked_result<void>(h) {}
+};
+
+template<typename ReturnType, typename... Args>
+class async_result<ceph::async::use_blocked_t, ReturnType(Args...)>
+ : public ceph::async::detail::blocked_result<std::decay_t<Args>...>
+{
+public:
+ explicit async_result(
+ typename ceph::async::detail::blocked_result<std::decay_t<Args>...>::completion_handler_type& h)
+ : ceph::async::detail::blocked_result<std::decay_t<Args>...>(h) {}
+};
+
+template<typename ReturnType>
+class async_result<ceph::async::use_blocked_t,
+ ReturnType(boost::system::error_code)>
+ : public ceph::async::detail::blocked_result<void>
+{
+public:
+ explicit async_result(
+ typename ceph::async::detail::blocked_result<void>::completion_handler_type& h)
+ : ceph::async::detail::blocked_result<void>(h) {}
+};
+
+template<typename ReturnType, typename... Args>
+class async_result<ceph::async::use_blocked_t,
+ ReturnType(boost::system::error_code, Args...)>
+ : public ceph::async::detail::blocked_result<std::decay_t<Args>...>
+{
+public:
+ explicit async_result(
+ typename ceph::async::detail::blocked_result<std::decay_t<Args>...>::completion_handler_type& h)
+ : ceph::async::detail::blocked_result<std::decay_t<Args>...>(h) {}
+};
+}
+
+#endif // !CEPH_COMMON_ASYNC_BLOCKED_COMPLETION_H
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2018 Red Hat
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+
+#include <boost/asio.hpp>
+#include <boost/system/error_code.hpp>
+
+#include <gtest/gtest.h>
+
+#include "common/async/bind_handler.h"
+#include "common/async/blocked_completion.h"
+#include "common/async/forward_handler.h"
+
+using namespace std::literals;
+
+namespace ba = boost::asio;
+namespace bs = boost::system;
+namespace ca = ceph::async;
+
+class context_thread {
+ ba::io_context c;
+ ba::executor_work_guard<ba::io_context::executor_type> guard;
+ std::thread th;
+
+public:
+ context_thread() noexcept
+ : guard(ba::make_work_guard(c)),
+ th([this]() noexcept { c.run();}) {}
+
+ ~context_thread() {
+ guard.reset();
+ th.join();
+ }
+
+ ba::io_context& io_context() noexcept {
+ return c;
+ }
+
+ ba::io_context::executor_type get_executor() noexcept {
+ return c.get_executor();
+ }
+};
+
+struct move_only {
+ move_only() = default;
+ move_only(move_only&&) = default;
+ move_only& operator=(move_only&&) = default;
+ move_only(const move_only&) = delete;
+ move_only& operator=(const move_only&) = delete;
+};
+
+struct defaultless {
+ int a;
+ defaultless(int a) : a(a) {}
+};
+
+template<typename Executor, typename CompletionToken, typename... Args>
+auto id(const Executor& executor, CompletionToken&& token,
+ Args&& ...args)
+{
+ ba::async_completion<CompletionToken, void(Args...)> init(token);
+ auto a = ba::get_associated_allocator(init.completion_handler);
+ executor.post(ca::forward_handler(
+ ca::bind_handler(std::move(init.completion_handler),
+ std::forward<Args>(args)...)),
+ a);
+ return init.result.get();
+}
+
+TEST(BlockedCompletion, Void)
+{
+ context_thread t;
+
+ ba::post(t.get_executor(), ca::use_blocked);
+}
+
+TEST(BlockedCompletion, Timer)
+{
+ context_thread t;
+ ba::steady_timer timer(t.io_context(), 50ms);
+ timer.async_wait(ca::use_blocked);
+}
+
+TEST(BlockedCompletion, NoError)
+{
+ context_thread t;
+ ba::steady_timer timer(t.io_context(), 1s);
+ bs::error_code ec;
+
+ EXPECT_NO_THROW(id(t.get_executor(), ca::use_blocked, bs::error_code{}));
+ EXPECT_NO_THROW(id(t.get_executor(), ca::use_blocked[ec], bs::error_code{}));
+ EXPECT_FALSE(ec);
+
+ int i;
+ EXPECT_NO_THROW(i = id(t.get_executor(), ca::use_blocked,
+ bs::error_code{}, 5));
+ ASSERT_EQ(5, i);
+ EXPECT_NO_THROW(i = id(t.get_executor(), ca::use_blocked[ec],
+ bs::error_code{}, 7));
+ EXPECT_FALSE(ec);
+ ASSERT_EQ(7, i);
+
+ float j;
+
+ EXPECT_NO_THROW(std::tie(i, j) = id(t.get_executor(), ca::use_blocked, 9,
+ 3.5));
+ ASSERT_EQ(9, i);
+ ASSERT_EQ(3.5, j);
+ EXPECT_NO_THROW(std::tie(i, j) = id(t.get_executor(), ca::use_blocked[ec],
+ 11, 2.25));
+ EXPECT_FALSE(ec);
+ ASSERT_EQ(11, i);
+ ASSERT_EQ(2.25, j);
+}
+
+TEST(BlockedCompletion, AnError)
+{
+ context_thread t;
+ ba::steady_timer timer(t.io_context(), 1s);
+ bs::error_code ec;
+
+ EXPECT_THROW(id(t.get_executor(), ca::use_blocked,
+ bs::error_code{EDOM, bs::system_category()}),
+ bs::system_error);
+ EXPECT_NO_THROW(id(t.get_executor(), ca::use_blocked[ec],
+ bs::error_code{EDOM, bs::system_category()}));
+ EXPECT_EQ(bs::error_code(EDOM, bs::system_category()), ec);
+
+ EXPECT_THROW(id(t.get_executor(), ca::use_blocked,
+ bs::error_code{EDOM, bs::system_category()}, 5),
+ bs::system_error);
+ EXPECT_NO_THROW(id(t.get_executor(), ca::use_blocked[ec],
+ bs::error_code{EDOM, bs::system_category()}, 5));
+ EXPECT_EQ(bs::error_code(EDOM, bs::system_category()), ec);
+
+ EXPECT_THROW(id(t.get_executor(), ca::use_blocked,
+ bs::error_code{EDOM, bs::system_category()}, 5, 3),
+ bs::system_error);
+ EXPECT_NO_THROW(id(t.get_executor(), ca::use_blocked[ec],
+ bs::error_code{EDOM, bs::system_category()}, 5, 3));
+ EXPECT_EQ(bs::error_code(EDOM, bs::system_category()), ec);
+}
+
+TEST(BlockedCompletion, MoveOnly)
+{
+ context_thread t;
+ ba::steady_timer timer(t.io_context(), 1s);
+ bs::error_code ec;
+
+
+ EXPECT_NO_THROW(id(t.get_executor(), ca::use_blocked,
+ bs::error_code{}, move_only{}));
+ EXPECT_NO_THROW(id(t.get_executor(), ca::use_blocked[ec],
+ bs::error_code{}, move_only{}));
+ EXPECT_FALSE(ec);
+
+ {
+ auto [i, j] = id(t.get_executor(), ca::use_blocked, move_only{}, 5);
+ EXPECT_EQ(j, 5);
+ }
+ {
+ auto [i, j] = id(t.get_executor(), ca::use_blocked[ec], move_only{}, 5);
+ EXPECT_EQ(j, 5);
+ }
+ EXPECT_FALSE(ec);
+
+
+ EXPECT_THROW(id(t.get_executor(), ca::use_blocked,
+ bs::error_code{EDOM, bs::system_category()}, move_only{}),
+ bs::system_error);
+ EXPECT_NO_THROW(id(t.get_executor(), ca::use_blocked[ec],
+ bs::error_code{EDOM, bs::system_category()}, move_only{}));
+ EXPECT_EQ(bs::error_code(EDOM, bs::system_category()), ec);
+
+ EXPECT_THROW(id(t.get_executor(), ca::use_blocked,
+ bs::error_code{EDOM, bs::system_category()}, move_only{}, 3),
+ bs::system_error);
+ EXPECT_NO_THROW(id(t.get_executor(), ca::use_blocked[ec],
+ bs::error_code{EDOM, bs::system_category()},
+ move_only{}, 3));
+ EXPECT_EQ(bs::error_code(EDOM, bs::system_category()), ec);
+}
+
+TEST(BlockedCompletion, DefaultLess)
+{
+ context_thread t;
+ ba::steady_timer timer(t.io_context(), 1s);
+ bs::error_code ec;
+
+
+ {
+ auto l = id(t.get_executor(), ca::use_blocked, bs::error_code{}, defaultless{5});
+ EXPECT_EQ(5, l.a);
+ }
+ {
+ auto l = id(t.get_executor(), ca::use_blocked[ec], bs::error_code{}, defaultless{7});
+ EXPECT_EQ(7, l.a);
+ }
+
+ {
+ auto [i, j] = id(t.get_executor(), ca::use_blocked, defaultless{3}, 5);
+ EXPECT_EQ(i.a, 3);
+ EXPECT_EQ(j, 5);
+ }
+ {
+ auto [i, j] = id(t.get_executor(), ca::use_blocked[ec], defaultless{3}, 5);
+ EXPECT_EQ(i.a, 3);
+ EXPECT_EQ(j, 5);
+ }
+ EXPECT_FALSE(ec);
+
+ EXPECT_THROW(id(t.get_executor(), ca::use_blocked,
+ bs::error_code{EDOM, bs::system_category()}, move_only{}),
+ bs::system_error);
+ EXPECT_NO_THROW(id(t.get_executor(), ca::use_blocked[ec],
+ bs::error_code{EDOM, bs::system_category()}, move_only{}));
+ EXPECT_EQ(bs::error_code(EDOM, bs::system_category()), ec);
+
+ EXPECT_THROW(id(t.get_executor(), ca::use_blocked,
+ bs::error_code{EDOM, bs::system_category()}, move_only{}, 3),
+ bs::system_error);
+ EXPECT_NO_THROW(id(t.get_executor(), ca::use_blocked[ec],
+ bs::error_code{EDOM, bs::system_category()},
+ move_only{}, 3));
+ EXPECT_EQ(bs::error_code(EDOM, bs::system_category()), ec);
+}