]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
common/async: Blocking completion for asio
authorAdam C. Emerson <aemerson@redhat.com>
Tue, 28 Apr 2020 18:43:23 +0000 (14:43 -0400)
committerAdam C. Emerson <aemerson@redhat.com>
Fri, 15 May 2020 14:55:10 +0000 (10:55 -0400)
Signed-off-by: Adam C. Emerson <aemerson@redhat.com>
src/common/async/blocked_completion.h [new file with mode: 0644]
src/test/common/CMakeLists.txt
src/test/common/test_blocked_completion.cc [new file with mode: 0644]

diff --git a/src/common/async/blocked_completion.h b/src/common/async/blocked_completion.h
new file mode 100644 (file)
index 0000000..633e01e
--- /dev/null
@@ -0,0 +1,273 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2020 Red Hat
+ * Author: Adam C. Emerson <aemerson@redhat.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef CEPH_COMMON_ASYNC_BLOCKED_COMPLETION_H
+#define CEPH_COMMON_ASYNC_BLOCKED_COMPLETION_H
+
+#include <atomic>
+#include <condition_variable>
+#include <mutex>
+#include <optional>
+#include <type_traits>
+
+#include <boost/asio/async_result.hpp>
+
+#include <boost/system/error_code.hpp>
+#include <boost/system/system_error.hpp>
+
+namespace ceph::async {
+
+namespace bs = boost::system;
+
+class use_blocked_t {
+  use_blocked_t(bs::error_code* ec) : ec(ec) {}
+public:
+  use_blocked_t() = default;
+
+  use_blocked_t operator [](bs::error_code& _ec) const {
+    return use_blocked_t(&_ec);
+  }
+
+  bs::error_code* ec = nullptr;
+};
+
+inline constexpr use_blocked_t use_blocked;
+
+namespace detail {
+
+template<typename... Ts>
+struct blocked_handler
+{
+  blocked_handler(use_blocked_t b) noexcept : ec(b.ec) {}
+
+  void operator ()(Ts... values) noexcept {
+    std::scoped_lock l(*m);
+    *ec = bs::error_code{};
+    *value = std::forward_as_tuple(std::move(values)...);
+    *done = true;
+    cv->notify_one();
+  }
+
+  void operator ()(bs::error_code ec, Ts... values) noexcept {
+    std::scoped_lock l(*m);
+    *this->ec = ec;
+    *value = std::forward_as_tuple(std::move(values)...);
+    *done = true;
+    cv->notify_one();
+  }
+
+  bs::error_code* ec;
+  std::optional<std::tuple<Ts...>>* value = nullptr;
+  std::mutex* m = nullptr;
+  std::condition_variable* cv = nullptr;
+  bool* done = nullptr;
+};
+
+template<typename T>
+struct blocked_handler<T>
+{
+  blocked_handler(use_blocked_t b) noexcept : ec(b.ec) {}
+
+  void operator ()(T value) noexcept {
+    std::scoped_lock l(*m);
+    *ec = bs::error_code();
+    *this->value = std::move(value);
+    *done = true;
+    cv->notify_one();
+  }
+
+  void operator ()(bs::error_code ec, T value) noexcept {
+    std::scoped_lock l(*m);
+    *this->ec = ec;
+    *this->value = std::move(value);
+    *done = true;
+    cv->notify_one();
+  }
+
+  //private:
+  bs::error_code* ec;
+  std::optional<T>* value;
+  std::mutex* m = nullptr;
+  std::condition_variable* cv = nullptr;
+  bool* done = nullptr;
+};
+
+template<>
+struct blocked_handler<void>
+{
+  blocked_handler(use_blocked_t b) noexcept : ec(b.ec) {}
+
+  void operator ()() noexcept {
+    std::scoped_lock l(*m);
+    *ec = bs::error_code{};
+    *done = true;
+    cv->notify_one();
+  }
+
+  void operator ()(bs::error_code ec) noexcept {
+    std::scoped_lock l(*m);
+    *this->ec = ec;
+    *done = true;
+    cv->notify_one();
+  }
+
+  bs::error_code* ec;
+  std::mutex* m = nullptr;
+  std::condition_variable* cv = nullptr;
+  bool* done = nullptr;
+};
+
+
+template<typename... Ts>
+class blocked_result
+{
+public:
+  using completion_handler_type = blocked_handler<Ts...>;
+  using return_type = std::tuple<Ts...>;
+
+  explicit blocked_result(completion_handler_type& h) noexcept {
+    out_ec = h.ec;
+    if (!out_ec) h.ec = &ec;
+    h.value = &value;
+    h.m = &m;
+    h.cv = &cv;
+    h.done = &done;
+  }
+
+  return_type get() {
+    std::unique_lock l(m);
+    cv.wait(l, [this]() { return done; });
+    if (!out_ec && ec) throw bs::system_error(ec);
+    return std::move(*value);
+  }
+
+private:
+  bs::error_code* out_ec;
+  bs::error_code ec;
+  std::optional<return_type> value;
+  std::mutex m;
+  std::condition_variable cv;
+  bool done = false;
+};
+
+template<typename T>
+class blocked_result<T>
+{
+public:
+  using completion_handler_type = blocked_handler<T>;
+  using return_type = T;
+
+  explicit blocked_result(completion_handler_type& h) noexcept {
+    out_ec = h.ec;
+    if (!out_ec) h.ec = &ec;
+    h.value = &value;
+    h.m = &m;
+    h.cv = &cv;
+    h.done = &done;
+  }
+
+  return_type get() {
+    std::unique_lock l(m);
+    cv.wait(l, [this]() { return done; });
+    if (!out_ec && ec) throw bs::system_error(ec);
+    return std::move(*value);
+  }
+
+private:
+  bs::error_code* out_ec;
+  bs::error_code ec;
+  std::optional<return_type> value;
+  std::mutex m;
+  std::condition_variable cv;
+  bool done = false;
+};
+
+template<>
+class blocked_result<void>
+{
+public:
+  using completion_handler_type = blocked_handler<void>;
+  using return_type = void;
+
+  explicit blocked_result(completion_handler_type& h) noexcept {
+    out_ec = h.ec;
+    if (!out_ec) h.ec = &ec;
+    h.m = &m;
+    h.cv = &cv;
+    h.done = &done;
+  }
+
+  void get() {
+    std::unique_lock l(m);
+    cv.wait(l, [this]() { return done; });
+    if (!out_ec && ec) throw bs::system_error(ec);
+  }
+
+private:
+  bs::error_code* out_ec;
+  bs::error_code ec;
+  std::mutex m;
+  std::condition_variable cv;
+  bool done = false;
+};
+} // namespace detail
+} // namespace ceph::async
+
+
+namespace boost::asio {
+template<typename ReturnType>
+class async_result<ceph::async::use_blocked_t, ReturnType()>
+  : public ceph::async::detail::blocked_result<void>
+{
+public:
+  explicit async_result(typename ceph::async::detail::blocked_result<void>
+                       ::completion_handler_type& h)
+    : ceph::async::detail::blocked_result<void>(h) {}
+};
+
+template<typename ReturnType, typename... Args>
+class async_result<ceph::async::use_blocked_t, ReturnType(Args...)>
+  : public ceph::async::detail::blocked_result<std::decay_t<Args>...>
+{
+public:
+  explicit async_result(
+    typename ceph::async::detail::blocked_result<std::decay_t<Args>...>::completion_handler_type& h)
+    : ceph::async::detail::blocked_result<std::decay_t<Args>...>(h) {}
+};
+
+template<typename ReturnType>
+class async_result<ceph::async::use_blocked_t,
+                  ReturnType(boost::system::error_code)>
+  : public ceph::async::detail::blocked_result<void>
+{
+public:
+  explicit async_result(
+    typename ceph::async::detail::blocked_result<void>::completion_handler_type& h)
+    : ceph::async::detail::blocked_result<void>(h) {}
+};
+
+template<typename ReturnType, typename... Args>
+class async_result<ceph::async::use_blocked_t,
+                  ReturnType(boost::system::error_code, Args...)>
+  : public ceph::async::detail::blocked_result<std::decay_t<Args>...>
+{
+public:
+  explicit async_result(
+    typename ceph::async::detail::blocked_result<std::decay_t<Args>...>::completion_handler_type& h)
+    : ceph::async::detail::blocked_result<std::decay_t<Args>...>(h) {}
+};
+}
+
+#endif // !CEPH_COMMON_ASYNC_BLOCKED_COMPLETION_H
index dbb2f63b0a9edb36890e1afb9001d14ec7b25024..93a01347838a7663629e7b10c97d63c1a2159c5f 100644 (file)
@@ -337,3 +337,7 @@ add_ceph_unittest(unittest_rabin_chunk)
 add_executable(unittest_ceph_timer test_ceph_timer.cc)
 target_link_libraries(unittest_rabin_chunk GTest::GTest)
 add_ceph_unittest(unittest_ceph_timer)
+
+add_executable(unittest_blocked_completion test_blocked_completion.cc)
+add_ceph_unittest(unittest_blocked_completion)
+target_link_libraries(unittest_blocked_completion Boost::system GTest::GTest)
diff --git a/src/test/common/test_blocked_completion.cc b/src/test/common/test_blocked_completion.cc
new file mode 100644 (file)
index 0000000..71e5784
--- /dev/null
@@ -0,0 +1,237 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2018 Red Hat
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+
+#include <boost/asio.hpp>
+#include <boost/system/error_code.hpp>
+
+#include <gtest/gtest.h>
+
+#include "common/async/bind_handler.h"
+#include "common/async/blocked_completion.h"
+#include "common/async/forward_handler.h"
+
+using namespace std::literals;
+
+namespace ba = boost::asio;
+namespace bs = boost::system;
+namespace ca = ceph::async;
+
+class context_thread {
+  ba::io_context c;
+  ba::executor_work_guard<ba::io_context::executor_type> guard;
+  std::thread th;
+
+public:
+  context_thread() noexcept
+    : guard(ba::make_work_guard(c)),
+      th([this]() noexcept { c.run();}) {}
+
+  ~context_thread() {
+    guard.reset();
+    th.join();
+  }
+
+  ba::io_context& io_context() noexcept {
+    return c;
+  }
+
+  ba::io_context::executor_type get_executor() noexcept {
+    return c.get_executor();
+  }
+};
+
+struct move_only {
+  move_only() = default;
+  move_only(move_only&&) = default;
+  move_only& operator=(move_only&&) = default;
+  move_only(const move_only&) = delete;
+  move_only& operator=(const move_only&) = delete;
+};
+
+struct defaultless {
+  int a;
+  defaultless(int a) : a(a) {}
+};
+
+template<typename Executor, typename CompletionToken, typename... Args>
+auto id(const Executor& executor, CompletionToken&& token,
+       Args&& ...args)
+{
+  ba::async_completion<CompletionToken, void(Args...)> init(token);
+  auto a = ba::get_associated_allocator(init.completion_handler);
+  executor.post(ca::forward_handler(
+                 ca::bind_handler(std::move(init.completion_handler),
+                                  std::forward<Args>(args)...)),
+               a);
+  return init.result.get();
+}
+
+TEST(BlockedCompletion, Void)
+{
+  context_thread t;
+
+  ba::post(t.get_executor(), ca::use_blocked);
+}
+
+TEST(BlockedCompletion, Timer)
+{
+  context_thread t;
+  ba::steady_timer timer(t.io_context(), 50ms);
+  timer.async_wait(ca::use_blocked);
+}
+
+TEST(BlockedCompletion, NoError)
+{
+  context_thread t;
+  ba::steady_timer timer(t.io_context(), 1s);
+  bs::error_code ec;
+
+  EXPECT_NO_THROW(id(t.get_executor(), ca::use_blocked, bs::error_code{}));
+  EXPECT_NO_THROW(id(t.get_executor(), ca::use_blocked[ec], bs::error_code{}));
+  EXPECT_FALSE(ec);
+
+  int i;
+  EXPECT_NO_THROW(i = id(t.get_executor(), ca::use_blocked,
+                        bs::error_code{}, 5));
+  ASSERT_EQ(5, i);
+  EXPECT_NO_THROW(i = id(t.get_executor(), ca::use_blocked[ec],
+                        bs::error_code{}, 7));
+  EXPECT_FALSE(ec);
+  ASSERT_EQ(7, i);
+
+  float j;
+
+  EXPECT_NO_THROW(std::tie(i, j) = id(t.get_executor(), ca::use_blocked, 9,
+                                     3.5));
+  ASSERT_EQ(9, i);
+  ASSERT_EQ(3.5, j);
+  EXPECT_NO_THROW(std::tie(i, j) = id(t.get_executor(), ca::use_blocked[ec],
+                                     11, 2.25));
+  EXPECT_FALSE(ec);
+  ASSERT_EQ(11, i);
+  ASSERT_EQ(2.25, j);
+}
+
+TEST(BlockedCompletion, AnError)
+{
+  context_thread t;
+  ba::steady_timer timer(t.io_context(), 1s);
+  bs::error_code ec;
+
+  EXPECT_THROW(id(t.get_executor(), ca::use_blocked,
+                 bs::error_code{EDOM, bs::system_category()}),
+              bs::system_error);
+  EXPECT_NO_THROW(id(t.get_executor(), ca::use_blocked[ec],
+                    bs::error_code{EDOM, bs::system_category()}));
+  EXPECT_EQ(bs::error_code(EDOM, bs::system_category()), ec);
+
+  EXPECT_THROW(id(t.get_executor(), ca::use_blocked,
+                 bs::error_code{EDOM, bs::system_category()}, 5),
+              bs::system_error);
+  EXPECT_NO_THROW(id(t.get_executor(), ca::use_blocked[ec],
+                    bs::error_code{EDOM, bs::system_category()}, 5));
+  EXPECT_EQ(bs::error_code(EDOM, bs::system_category()), ec);
+
+  EXPECT_THROW(id(t.get_executor(), ca::use_blocked,
+                 bs::error_code{EDOM, bs::system_category()}, 5, 3),
+              bs::system_error);
+  EXPECT_NO_THROW(id(t.get_executor(), ca::use_blocked[ec],
+                    bs::error_code{EDOM, bs::system_category()}, 5, 3));
+  EXPECT_EQ(bs::error_code(EDOM, bs::system_category()), ec);
+}
+
+TEST(BlockedCompletion, MoveOnly)
+{
+  context_thread t;
+  ba::steady_timer timer(t.io_context(), 1s);
+  bs::error_code ec;
+
+
+  EXPECT_NO_THROW(id(t.get_executor(), ca::use_blocked,
+                        bs::error_code{}, move_only{}));
+  EXPECT_NO_THROW(id(t.get_executor(), ca::use_blocked[ec],
+                    bs::error_code{}, move_only{}));
+  EXPECT_FALSE(ec);
+
+  {
+    auto [i, j] = id(t.get_executor(), ca::use_blocked, move_only{}, 5);
+    EXPECT_EQ(j, 5);
+  }
+  {
+    auto [i, j] = id(t.get_executor(), ca::use_blocked[ec], move_only{}, 5);
+    EXPECT_EQ(j, 5);
+  }
+  EXPECT_FALSE(ec);
+
+
+  EXPECT_THROW(id(t.get_executor(), ca::use_blocked,
+                 bs::error_code{EDOM, bs::system_category()}, move_only{}),
+              bs::system_error);
+  EXPECT_NO_THROW(id(t.get_executor(), ca::use_blocked[ec],
+                    bs::error_code{EDOM, bs::system_category()}, move_only{}));
+  EXPECT_EQ(bs::error_code(EDOM, bs::system_category()), ec);
+
+  EXPECT_THROW(id(t.get_executor(), ca::use_blocked,
+                 bs::error_code{EDOM, bs::system_category()}, move_only{}, 3),
+              bs::system_error);
+  EXPECT_NO_THROW(id(t.get_executor(), ca::use_blocked[ec],
+                    bs::error_code{EDOM, bs::system_category()},
+                    move_only{}, 3));
+  EXPECT_EQ(bs::error_code(EDOM, bs::system_category()), ec);
+}
+
+TEST(BlockedCompletion, DefaultLess)
+{
+  context_thread t;
+  ba::steady_timer timer(t.io_context(), 1s);
+  bs::error_code ec;
+
+
+  {
+    auto l = id(t.get_executor(), ca::use_blocked, bs::error_code{}, defaultless{5});
+    EXPECT_EQ(5, l.a);
+  }
+  {
+    auto l = id(t.get_executor(), ca::use_blocked[ec], bs::error_code{}, defaultless{7});
+    EXPECT_EQ(7, l.a);
+  }
+
+  {
+    auto [i, j] = id(t.get_executor(), ca::use_blocked, defaultless{3}, 5);
+    EXPECT_EQ(i.a, 3);
+    EXPECT_EQ(j, 5);
+  }
+  {
+    auto [i, j] = id(t.get_executor(), ca::use_blocked[ec], defaultless{3}, 5);
+    EXPECT_EQ(i.a, 3);
+    EXPECT_EQ(j, 5);
+  }
+  EXPECT_FALSE(ec);
+
+  EXPECT_THROW(id(t.get_executor(), ca::use_blocked,
+                 bs::error_code{EDOM, bs::system_category()}, move_only{}),
+              bs::system_error);
+  EXPECT_NO_THROW(id(t.get_executor(), ca::use_blocked[ec],
+                    bs::error_code{EDOM, bs::system_category()}, move_only{}));
+  EXPECT_EQ(bs::error_code(EDOM, bs::system_category()), ec);
+
+  EXPECT_THROW(id(t.get_executor(), ca::use_blocked,
+                 bs::error_code{EDOM, bs::system_category()}, move_only{}, 3),
+              bs::system_error);
+  EXPECT_NO_THROW(id(t.get_executor(), ca::use_blocked[ec],
+                    bs::error_code{EDOM, bs::system_category()},
+                    move_only{}, 3));
+  EXPECT_EQ(bs::error_code(EDOM, bs::system_category()), ec);
+}