From caf6ce2ac2dd0808ffec7195cdfef410bb6da854 Mon Sep 17 00:00:00 2001 From: "Adam C. Emerson" Date: Tue, 28 Apr 2020 14:43:23 -0400 Subject: [PATCH] common/async: Blocking completion for asio Signed-off-by: Adam C. Emerson --- src/common/async/blocked_completion.h | 273 +++++++++++++++++++++ src/test/common/CMakeLists.txt | 4 + src/test/common/test_blocked_completion.cc | 237 ++++++++++++++++++ 3 files changed, 514 insertions(+) create mode 100644 src/common/async/blocked_completion.h create mode 100644 src/test/common/test_blocked_completion.cc diff --git a/src/common/async/blocked_completion.h b/src/common/async/blocked_completion.h new file mode 100644 index 0000000000000..633e01ecf5eb9 --- /dev/null +++ b/src/common/async/blocked_completion.h @@ -0,0 +1,273 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2020 Red Hat + * Author: Adam C. Emerson + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef CEPH_COMMON_ASYNC_BLOCKED_COMPLETION_H +#define CEPH_COMMON_ASYNC_BLOCKED_COMPLETION_H + +#include +#include +#include +#include +#include + +#include + +#include +#include + +namespace ceph::async { + +namespace bs = boost::system; + +class use_blocked_t { + use_blocked_t(bs::error_code* ec) : ec(ec) {} +public: + use_blocked_t() = default; + + use_blocked_t operator [](bs::error_code& _ec) const { + return use_blocked_t(&_ec); + } + + bs::error_code* ec = nullptr; +}; + +inline constexpr use_blocked_t use_blocked; + +namespace detail { + +template +struct blocked_handler +{ + blocked_handler(use_blocked_t b) noexcept : ec(b.ec) {} + + void operator ()(Ts... values) noexcept { + std::scoped_lock l(*m); + *ec = bs::error_code{}; + *value = std::forward_as_tuple(std::move(values)...); + *done = true; + cv->notify_one(); + } + + void operator ()(bs::error_code ec, Ts... values) noexcept { + std::scoped_lock l(*m); + *this->ec = ec; + *value = std::forward_as_tuple(std::move(values)...); + *done = true; + cv->notify_one(); + } + + bs::error_code* ec; + std::optional>* value = nullptr; + std::mutex* m = nullptr; + std::condition_variable* cv = nullptr; + bool* done = nullptr; +}; + +template +struct blocked_handler +{ + blocked_handler(use_blocked_t b) noexcept : ec(b.ec) {} + + void operator ()(T value) noexcept { + std::scoped_lock l(*m); + *ec = bs::error_code(); + *this->value = std::move(value); + *done = true; + cv->notify_one(); + } + + void operator ()(bs::error_code ec, T value) noexcept { + std::scoped_lock l(*m); + *this->ec = ec; + *this->value = std::move(value); + *done = true; + cv->notify_one(); + } + + //private: + bs::error_code* ec; + std::optional* value; + std::mutex* m = nullptr; + std::condition_variable* cv = nullptr; + bool* done = nullptr; +}; + +template<> +struct blocked_handler +{ + blocked_handler(use_blocked_t b) noexcept : ec(b.ec) {} + + void operator ()() noexcept { + std::scoped_lock l(*m); + *ec = bs::error_code{}; + *done = true; + cv->notify_one(); + } + + void operator ()(bs::error_code ec) noexcept { + std::scoped_lock l(*m); + *this->ec = ec; + *done = true; + cv->notify_one(); + } + + bs::error_code* ec; + std::mutex* m = nullptr; + std::condition_variable* cv = nullptr; + bool* done = nullptr; +}; + + +template +class blocked_result +{ +public: + using completion_handler_type = blocked_handler; + using return_type = std::tuple; + + explicit blocked_result(completion_handler_type& h) noexcept { + out_ec = h.ec; + if (!out_ec) h.ec = &ec; + h.value = &value; + h.m = &m; + h.cv = &cv; + h.done = &done; + } + + return_type get() { + std::unique_lock l(m); + cv.wait(l, [this]() { return done; }); + if (!out_ec && ec) throw bs::system_error(ec); + return std::move(*value); + } + +private: + bs::error_code* out_ec; + bs::error_code ec; + std::optional value; + std::mutex m; + std::condition_variable cv; + bool done = false; +}; + +template +class blocked_result +{ +public: + using completion_handler_type = blocked_handler; + using return_type = T; + + explicit blocked_result(completion_handler_type& h) noexcept { + out_ec = h.ec; + if (!out_ec) h.ec = &ec; + h.value = &value; + h.m = &m; + h.cv = &cv; + h.done = &done; + } + + return_type get() { + std::unique_lock l(m); + cv.wait(l, [this]() { return done; }); + if (!out_ec && ec) throw bs::system_error(ec); + return std::move(*value); + } + +private: + bs::error_code* out_ec; + bs::error_code ec; + std::optional value; + std::mutex m; + std::condition_variable cv; + bool done = false; +}; + +template<> +class blocked_result +{ +public: + using completion_handler_type = blocked_handler; + using return_type = void; + + explicit blocked_result(completion_handler_type& h) noexcept { + out_ec = h.ec; + if (!out_ec) h.ec = &ec; + h.m = &m; + h.cv = &cv; + h.done = &done; + } + + void get() { + std::unique_lock l(m); + cv.wait(l, [this]() { return done; }); + if (!out_ec && ec) throw bs::system_error(ec); + } + +private: + bs::error_code* out_ec; + bs::error_code ec; + std::mutex m; + std::condition_variable cv; + bool done = false; +}; +} // namespace detail +} // namespace ceph::async + + +namespace boost::asio { +template +class async_result + : public ceph::async::detail::blocked_result +{ +public: + explicit async_result(typename ceph::async::detail::blocked_result + ::completion_handler_type& h) + : ceph::async::detail::blocked_result(h) {} +}; + +template +class async_result + : public ceph::async::detail::blocked_result...> +{ +public: + explicit async_result( + typename ceph::async::detail::blocked_result...>::completion_handler_type& h) + : ceph::async::detail::blocked_result...>(h) {} +}; + +template +class async_result + : public ceph::async::detail::blocked_result +{ +public: + explicit async_result( + typename ceph::async::detail::blocked_result::completion_handler_type& h) + : ceph::async::detail::blocked_result(h) {} +}; + +template +class async_result + : public ceph::async::detail::blocked_result...> +{ +public: + explicit async_result( + typename ceph::async::detail::blocked_result...>::completion_handler_type& h) + : ceph::async::detail::blocked_result...>(h) {} +}; +} + +#endif // !CEPH_COMMON_ASYNC_BLOCKED_COMPLETION_H diff --git a/src/test/common/CMakeLists.txt b/src/test/common/CMakeLists.txt index dbb2f63b0a9ed..93a01347838a7 100644 --- a/src/test/common/CMakeLists.txt +++ b/src/test/common/CMakeLists.txt @@ -337,3 +337,7 @@ add_ceph_unittest(unittest_rabin_chunk) add_executable(unittest_ceph_timer test_ceph_timer.cc) target_link_libraries(unittest_rabin_chunk GTest::GTest) add_ceph_unittest(unittest_ceph_timer) + +add_executable(unittest_blocked_completion test_blocked_completion.cc) +add_ceph_unittest(unittest_blocked_completion) +target_link_libraries(unittest_blocked_completion Boost::system GTest::GTest) diff --git a/src/test/common/test_blocked_completion.cc b/src/test/common/test_blocked_completion.cc new file mode 100644 index 0000000000000..71e5784af7e4a --- /dev/null +++ b/src/test/common/test_blocked_completion.cc @@ -0,0 +1,237 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2018 Red Hat + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + + +#include +#include + +#include + +#include "common/async/bind_handler.h" +#include "common/async/blocked_completion.h" +#include "common/async/forward_handler.h" + +using namespace std::literals; + +namespace ba = boost::asio; +namespace bs = boost::system; +namespace ca = ceph::async; + +class context_thread { + ba::io_context c; + ba::executor_work_guard guard; + std::thread th; + +public: + context_thread() noexcept + : guard(ba::make_work_guard(c)), + th([this]() noexcept { c.run();}) {} + + ~context_thread() { + guard.reset(); + th.join(); + } + + ba::io_context& io_context() noexcept { + return c; + } + + ba::io_context::executor_type get_executor() noexcept { + return c.get_executor(); + } +}; + +struct move_only { + move_only() = default; + move_only(move_only&&) = default; + move_only& operator=(move_only&&) = default; + move_only(const move_only&) = delete; + move_only& operator=(const move_only&) = delete; +}; + +struct defaultless { + int a; + defaultless(int a) : a(a) {} +}; + +template +auto id(const Executor& executor, CompletionToken&& token, + Args&& ...args) +{ + ba::async_completion init(token); + auto a = ba::get_associated_allocator(init.completion_handler); + executor.post(ca::forward_handler( + ca::bind_handler(std::move(init.completion_handler), + std::forward(args)...)), + a); + return init.result.get(); +} + +TEST(BlockedCompletion, Void) +{ + context_thread t; + + ba::post(t.get_executor(), ca::use_blocked); +} + +TEST(BlockedCompletion, Timer) +{ + context_thread t; + ba::steady_timer timer(t.io_context(), 50ms); + timer.async_wait(ca::use_blocked); +} + +TEST(BlockedCompletion, NoError) +{ + context_thread t; + ba::steady_timer timer(t.io_context(), 1s); + bs::error_code ec; + + EXPECT_NO_THROW(id(t.get_executor(), ca::use_blocked, bs::error_code{})); + EXPECT_NO_THROW(id(t.get_executor(), ca::use_blocked[ec], bs::error_code{})); + EXPECT_FALSE(ec); + + int i; + EXPECT_NO_THROW(i = id(t.get_executor(), ca::use_blocked, + bs::error_code{}, 5)); + ASSERT_EQ(5, i); + EXPECT_NO_THROW(i = id(t.get_executor(), ca::use_blocked[ec], + bs::error_code{}, 7)); + EXPECT_FALSE(ec); + ASSERT_EQ(7, i); + + float j; + + EXPECT_NO_THROW(std::tie(i, j) = id(t.get_executor(), ca::use_blocked, 9, + 3.5)); + ASSERT_EQ(9, i); + ASSERT_EQ(3.5, j); + EXPECT_NO_THROW(std::tie(i, j) = id(t.get_executor(), ca::use_blocked[ec], + 11, 2.25)); + EXPECT_FALSE(ec); + ASSERT_EQ(11, i); + ASSERT_EQ(2.25, j); +} + +TEST(BlockedCompletion, AnError) +{ + context_thread t; + ba::steady_timer timer(t.io_context(), 1s); + bs::error_code ec; + + EXPECT_THROW(id(t.get_executor(), ca::use_blocked, + bs::error_code{EDOM, bs::system_category()}), + bs::system_error); + EXPECT_NO_THROW(id(t.get_executor(), ca::use_blocked[ec], + bs::error_code{EDOM, bs::system_category()})); + EXPECT_EQ(bs::error_code(EDOM, bs::system_category()), ec); + + EXPECT_THROW(id(t.get_executor(), ca::use_blocked, + bs::error_code{EDOM, bs::system_category()}, 5), + bs::system_error); + EXPECT_NO_THROW(id(t.get_executor(), ca::use_blocked[ec], + bs::error_code{EDOM, bs::system_category()}, 5)); + EXPECT_EQ(bs::error_code(EDOM, bs::system_category()), ec); + + EXPECT_THROW(id(t.get_executor(), ca::use_blocked, + bs::error_code{EDOM, bs::system_category()}, 5, 3), + bs::system_error); + EXPECT_NO_THROW(id(t.get_executor(), ca::use_blocked[ec], + bs::error_code{EDOM, bs::system_category()}, 5, 3)); + EXPECT_EQ(bs::error_code(EDOM, bs::system_category()), ec); +} + +TEST(BlockedCompletion, MoveOnly) +{ + context_thread t; + ba::steady_timer timer(t.io_context(), 1s); + bs::error_code ec; + + + EXPECT_NO_THROW(id(t.get_executor(), ca::use_blocked, + bs::error_code{}, move_only{})); + EXPECT_NO_THROW(id(t.get_executor(), ca::use_blocked[ec], + bs::error_code{}, move_only{})); + EXPECT_FALSE(ec); + + { + auto [i, j] = id(t.get_executor(), ca::use_blocked, move_only{}, 5); + EXPECT_EQ(j, 5); + } + { + auto [i, j] = id(t.get_executor(), ca::use_blocked[ec], move_only{}, 5); + EXPECT_EQ(j, 5); + } + EXPECT_FALSE(ec); + + + EXPECT_THROW(id(t.get_executor(), ca::use_blocked, + bs::error_code{EDOM, bs::system_category()}, move_only{}), + bs::system_error); + EXPECT_NO_THROW(id(t.get_executor(), ca::use_blocked[ec], + bs::error_code{EDOM, bs::system_category()}, move_only{})); + EXPECT_EQ(bs::error_code(EDOM, bs::system_category()), ec); + + EXPECT_THROW(id(t.get_executor(), ca::use_blocked, + bs::error_code{EDOM, bs::system_category()}, move_only{}, 3), + bs::system_error); + EXPECT_NO_THROW(id(t.get_executor(), ca::use_blocked[ec], + bs::error_code{EDOM, bs::system_category()}, + move_only{}, 3)); + EXPECT_EQ(bs::error_code(EDOM, bs::system_category()), ec); +} + +TEST(BlockedCompletion, DefaultLess) +{ + context_thread t; + ba::steady_timer timer(t.io_context(), 1s); + bs::error_code ec; + + + { + auto l = id(t.get_executor(), ca::use_blocked, bs::error_code{}, defaultless{5}); + EXPECT_EQ(5, l.a); + } + { + auto l = id(t.get_executor(), ca::use_blocked[ec], bs::error_code{}, defaultless{7}); + EXPECT_EQ(7, l.a); + } + + { + auto [i, j] = id(t.get_executor(), ca::use_blocked, defaultless{3}, 5); + EXPECT_EQ(i.a, 3); + EXPECT_EQ(j, 5); + } + { + auto [i, j] = id(t.get_executor(), ca::use_blocked[ec], defaultless{3}, 5); + EXPECT_EQ(i.a, 3); + EXPECT_EQ(j, 5); + } + EXPECT_FALSE(ec); + + EXPECT_THROW(id(t.get_executor(), ca::use_blocked, + bs::error_code{EDOM, bs::system_category()}, move_only{}), + bs::system_error); + EXPECT_NO_THROW(id(t.get_executor(), ca::use_blocked[ec], + bs::error_code{EDOM, bs::system_category()}, move_only{})); + EXPECT_EQ(bs::error_code(EDOM, bs::system_category()), ec); + + EXPECT_THROW(id(t.get_executor(), ca::use_blocked, + bs::error_code{EDOM, bs::system_category()}, move_only{}, 3), + bs::system_error); + EXPECT_NO_THROW(id(t.get_executor(), ca::use_blocked[ec], + bs::error_code{EDOM, bs::system_category()}, + move_only{}, 3)); + EXPECT_EQ(bs::error_code(EDOM, bs::system_category()), ec); +} -- 2.39.5