--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2018 Red Hat
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef CEPH_ASYNC_BIND_HANDLER_H
+#define CEPH_ASYNC_BIND_HANDLER_H
+
+#include <tuple>
+#include <boost/asio.hpp>
+
+namespace ceph::async {
+
+/**
+ * A bound completion handler for use with boost::asio.
+ *
+ * A completion handler wrapper that allows a tuple of arguments to be forwarded
+ * to the original Handler. This is intended for use with boost::asio functions
+ * like defer(), dispatch() and post() which expect handlers which are callable
+ * with no arguments.
+ *
+ * The original Handler's associated allocator and executor are maintained.
+ *
+ * @see bind_handler
+ */
+template <typename Handler, typename Tuple>
+struct CompletionHandler {
+ Handler handler;
+ Tuple args;
+
+ CompletionHandler(Handler&& handler, Tuple&& args)
+ : handler(std::move(handler)),
+ args(std::move(args))
+ {}
+
+ void operator()() & {
+ std::apply(handler, args);
+ }
+ void operator()() const & {
+ std::apply(handler, args);
+ }
+ void operator()() && {
+ std::apply(std::move(handler), std::move(args));
+ }
+
+ using allocator_type = boost::asio::associated_allocator_t<Handler>;
+ allocator_type get_allocator() const noexcept {
+ return boost::asio::get_associated_allocator(handler);
+ }
+};
+
+} // namespace ceph::async
+
+namespace boost::asio {
+
+// specialize boost::asio::associated_executor<> for CompletionHandler
+template <typename Handler, typename Tuple, typename Executor>
+struct associated_executor<ceph::async::CompletionHandler<Handler, Tuple>, Executor> {
+ using type = boost::asio::associated_executor_t<Handler, Executor>;
+
+ static type get(const ceph::async::CompletionHandler<Handler, Tuple>& handler,
+ const Executor& ex = Executor()) noexcept {
+ return boost::asio::get_associated_executor(handler.handler, ex);
+ }
+};
+
+} // namespace boost::asio
+
+namespace ceph::async {
+
+/**
+ * Returns a wrapped completion handler with bound arguments.
+ *
+ * Binds the given arguments to a handler, and returns a CompletionHandler that
+ * is callable with no arguments. This is similar to std::bind(), except that
+ * all arguments must be provided. Move-only argument types are supported as
+ * long as the CompletionHandler's 'operator() &&' overload is used, i.e.
+ * std::move(handler)().
+ *
+ * Example use:
+ *
+ * // bind the arguments (5, "hello") to a callback lambda:
+ * auto callback = [] (int a, std::string b) {};
+ * auto handler = bind_handler(callback, 5, "hello");
+ *
+ * // execute the bound handler on an io_context:
+ * boost::asio::io_context context;
+ * boost::asio::post(context, std::move(handler));
+ * context.run();
+ *
+ * @see CompletionHandler
+ */
+template <typename Handler, typename ...Args>
+auto bind_handler(Handler&& h, Args&& ...args)
+{
+ return CompletionHandler{std::forward<Handler>(h),
+ std::make_tuple(std::forward<Args>(args)...)};
+}
+
+} // namespace ceph::async
+
+#endif // CEPH_ASYNC_BIND_HANDLER_H
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2018 Red Hat
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef CEPH_ASYNC_COMPLETION_H
+#define CEPH_ASYNC_COMPLETION_H
+
+#include <memory>
+
+#include "bind_handler.h"
+
+namespace ceph::async {
+
+/**
+ * Abstract completion handler interface for use with boost::asio.
+ *
+ * Memory management is performed using the Handler's 'associated allocator',
+ * which carries the additional requirement that its memory be released before
+ * the Handler is invoked. This allows memory allocated for one asynchronous
+ * operation to be reused in its continuation. Because of this requirement, any
+ * calls to invoke the completion must first release ownership of it. To enforce
+ * this, the static functions defer()/dispatch()/post() take the completion by
+ * rvalue-reference to std::unique_ptr<Completion>, i.e. std::move(completion).
+ *
+ * Handlers may also have an 'associated executor', so the calls to defer(),
+ * dispatch(), and post() are forwarded to that executor. If there is no
+ * associated executor (which is generally the case unless one was bound with
+ * boost::asio::bind_executor()), the executor passed to Completion::create()
+ * is used as a default.
+ *
+ * Example use:
+ *
+ * // declare a Completion type with Signature = void(int, string)
+ * using MyCompletion = ceph::async::Completion<void(int, string)>;
+ *
+ * // create a completion with the given callback:
+ * std::unique_ptr<MyCompletion> c;
+ * c = MyCompletion::create(ex, [] (int a, const string& b) {});
+ *
+ * // bind arguments to the callback and post to its associated executor:
+ * MyCompletion::post(std::move(c), 5, "hello");
+ *
+ *
+ * Additional user data may be stored along with the Completion to take
+ * advantage of the handler allocator optimization. This is accomplished by
+ * specifying its type in the template parameter T. For example, the type
+ * Completion<void(), int> contains a public member variable 'int user_data'.
+ * Any additional arguments to Completion::create() will be forwarded to type
+ * T's constructor.
+ *
+ * If the AsBase<T> type tag is used, as in Completion<void(), AsBase<T>>,
+ * the Completion will inherit from T instead of declaring it as a member
+ * variable.
+ *
+ * When invoking the completion handler via defer(), dispatch(), or post(),
+ * care must be taken when passing arguments that refer to user data, because
+ * its memory is destroyed prior to invocation. In such cases, the user data
+ * should be moved/copied out of the Completion first.
+ */
+template <typename Signature, typename T = void>
+class Completion;
+
+
+/// type tag for UserData
+template <typename T> struct AsBase {};
+
+namespace detail {
+
+/// optional user data to be stored with the Completion
+template <typename T>
+struct UserData {
+ T user_data;
+ template <typename ...Args>
+ UserData(Args&& ...args)
+ : user_data(std::forward<Args>(args)...)
+ {}
+};
+// AsBase specialization inherits from T
+template <typename T>
+struct UserData<AsBase<T>> : public T {
+ template <typename ...Args>
+ UserData(Args&& ...args)
+ : T(std::forward<Args>(args)...)
+ {}
+};
+// void specialization
+template <>
+class UserData<void> {};
+
+} // namespace detail
+
+
+// template specialization to pull the Signature's args apart
+template <typename T, typename ...Args>
+class Completion<void(Args...), T> : public detail::UserData<T> {
+ protected:
+ // internal interfaces for type-erasure on the Handler/Executor. uses
+ // tuple<Args...> to provide perfect forwarding because you can't make
+ // virtual function templates
+ virtual void destroy_defer(std::tuple<Args...>&& args) = 0;
+ virtual void destroy_dispatch(std::tuple<Args...>&& args) = 0;
+ virtual void destroy_post(std::tuple<Args...>&& args) = 0;
+ virtual void destroy() = 0;
+
+ // constructor is protected, use create(). any constructor arguments are
+ // forwarded to UserData
+ template <typename ...TArgs>
+ Completion(TArgs&& ...args)
+ : detail::UserData<T>(std::forward<TArgs>(args)...)
+ {}
+ public:
+ virtual ~Completion() = default;
+
+ // use the virtual destroy() interface on delete. this allows the derived
+ // class to manage its memory using Handler allocators, without having to use
+ // a custom Deleter for std::unique_ptr<>
+ static void operator delete(void *p) {
+ static_cast<Completion*>(p)->destroy();
+ }
+
+ /// completion factory function that uses the handler's associated allocator.
+ /// any additional arguments are forwared to T's constructor
+ template <typename Executor1, typename Handler, typename ...TArgs>
+ static std::unique_ptr<Completion>
+ create(const Executor1& ex1, Handler&& handler, TArgs&& ...args);
+
+ /// take ownership of the completion, bind any arguments to the completion
+ /// handler, then defer() it on its associated executor
+ template <typename ...Args2>
+ static void defer(std::unique_ptr<Completion>&& c, Args2&&...args);
+
+ /// take ownership of the completion, bind any arguments to the completion
+ /// handler, then dispatch() it on its associated executor
+ template <typename ...Args2>
+ static void dispatch(std::unique_ptr<Completion>&& c, Args2&&...args);
+
+ /// take ownership of the completion, bind any arguments to the completion
+ /// handler, then post() it to its associated executor
+ template <typename ...Args2>
+ static void post(std::unique_ptr<Completion>&& c, Args2&&...args);
+};
+
+namespace detail {
+
+// concrete Completion that knows how to invoke the completion handler. this
+// observes all of the 'Requirements on asynchronous operations' specified by
+// the C++ Networking TS
+template <typename Executor1, typename Handler, typename T, typename ...Args>
+class CompletionImpl final : public Completion<void(Args...), T> {
+ // use Handler's associated executor (or Executor1 by default) for callbacks
+ using Executor2 = boost::asio::associated_executor_t<Handler, Executor1>;
+ // maintain work on both executors
+ using Work1 = boost::asio::executor_work_guard<Executor1>;
+ using Work2 = boost::asio::executor_work_guard<Executor2>;
+ std::pair<Work1, Work2> work;
+ Handler handler;
+
+ // use Handler's associated allocator
+ using Alloc2 = boost::asio::associated_allocator_t<Handler>;
+ using Traits2 = std::allocator_traits<Alloc2>;
+ using RebindAlloc2 = typename Traits2::template rebind_alloc<CompletionImpl>;
+ using RebindTraits2 = std::allocator_traits<RebindAlloc2>;
+
+ // placement new for the handler allocator
+ static void* operator new(size_t, RebindAlloc2 alloc2) {
+ return RebindTraits2::allocate(alloc2, 1);
+ }
+ // placement delete for when the constructor throws during placement new
+ static void operator delete(void *p, RebindAlloc2 alloc2) {
+ RebindTraits2::deallocate(alloc2, static_cast<CompletionImpl*>(p), 1);
+ }
+
+ void destroy_defer(std::tuple<Args...>&& args) override {
+ auto w = std::move(work);
+ auto f = CompletionHandler{std::move(handler), std::move(args)};
+ RebindAlloc2 alloc2 = boost::asio::get_associated_allocator(handler);
+ RebindTraits2::destroy(alloc2, this);
+ RebindTraits2::deallocate(alloc2, this, 1);
+ w.second.get_executor().defer(std::move(f), alloc2);
+ }
+ void destroy_dispatch(std::tuple<Args...>&& args) override {
+ auto w = std::move(work);
+ auto f = CompletionHandler{std::move(handler), std::move(args)};
+ RebindAlloc2 alloc2 = boost::asio::get_associated_allocator(handler);
+ RebindTraits2::destroy(alloc2, this);
+ RebindTraits2::deallocate(alloc2, this, 1);
+ w.second.get_executor().dispatch(std::move(f), alloc2);
+ }
+ void destroy_post(std::tuple<Args...>&& args) override {
+ auto w = std::move(work);
+ auto f = CompletionHandler{std::move(handler), std::move(args)};
+ RebindAlloc2 alloc2 = boost::asio::get_associated_allocator(handler);
+ RebindTraits2::destroy(alloc2, this);
+ RebindTraits2::deallocate(alloc2, this, 1);
+ w.second.get_executor().post(std::move(f), alloc2);
+ }
+ void destroy() override {
+ RebindAlloc2 alloc2 = boost::asio::get_associated_allocator(handler);
+ RebindTraits2::destroy(alloc2, this);
+ RebindTraits2::deallocate(alloc2, this, 1);
+ }
+
+ // constructor is private, use create(). extra constructor arguments are
+ // forwarded to UserData
+ template <typename ...TArgs>
+ CompletionImpl(const Executor1& ex1, Handler&& handler, TArgs&& ...args)
+ : Completion<void(Args...), T>(std::forward<TArgs>(args)...),
+ work(ex1, boost::asio::make_work_guard(handler, ex1)),
+ handler(std::move(handler))
+ {}
+
+ public:
+ template <typename ...TArgs>
+ static auto create(const Executor1& ex, Handler&& handler, TArgs&& ...args) {
+ auto alloc2 = boost::asio::get_associated_allocator(handler);
+ using Ptr = std::unique_ptr<CompletionImpl>;
+ return Ptr{new (alloc2) CompletionImpl(ex, std::move(handler),
+ std::forward<TArgs>(args)...)};
+ }
+
+ static void operator delete(void *p) {
+ static_cast<CompletionImpl*>(p)->destroy();
+ }
+};
+
+} // namespace detail
+
+
+template <typename T, typename ...Args>
+template <typename Executor1, typename Handler, typename ...TArgs>
+std::unique_ptr<Completion<void(Args...), T>>
+Completion<void(Args...), T>::create(const Executor1& ex,
+ Handler&& handler, TArgs&& ...args)
+{
+ using Impl = detail::CompletionImpl<Executor1, Handler, T, Args...>;
+ return Impl::create(ex, std::forward<Handler>(handler),
+ std::forward<TArgs>(args)...);
+}
+
+template <typename T, typename ...Args>
+template <typename ...Args2>
+void Completion<void(Args...), T>::defer(std::unique_ptr<Completion>&& ptr,
+ Args2&& ...args)
+{
+ auto c = ptr.release();
+ c->destroy_defer(std::make_tuple(std::forward<Args2>(args)...));
+}
+
+template <typename T, typename ...Args>
+template <typename ...Args2>
+void Completion<void(Args...), T>::dispatch(std::unique_ptr<Completion>&& ptr,
+ Args2&& ...args)
+{
+ auto c = ptr.release();
+ c->destroy_dispatch(std::make_tuple(std::forward<Args2>(args)...));
+}
+
+template <typename T, typename ...Args>
+template <typename ...Args2>
+void Completion<void(Args...), T>::post(std::unique_ptr<Completion>&& ptr,
+ Args2&& ...args)
+{
+ auto c = ptr.release();
+ c->destroy_post(std::make_tuple(std::forward<Args2>(args)...));
+}
+
+
+/// completion factory function that uses the handler's associated allocator.
+/// any additional arguments are forwared to T's constructor
+template <typename Signature, typename T, typename Executor1,
+ typename Handler, typename ...TArgs>
+std::unique_ptr<Completion<Signature, T>>
+create_completion(const Executor1& ex, Handler&& handler, TArgs&& ...args)
+{
+ return Completion<Signature, T>::create(ex, std::forward<Handler>(handler),
+ std::forward<TArgs>(args)...);
+}
+
+/// take ownership of the completion, bind any arguments to the completion
+/// handler, then defer() it on its associated executor
+template <typename Signature, typename T, typename ...Args>
+void defer(std::unique_ptr<Completion<Signature, T>>&& ptr, Args&& ...args)
+{
+ Completion<Signature, T>::defer(std::move(ptr), std::forward<Args>(args)...);
+}
+
+/// take ownership of the completion, bind any arguments to the completion
+/// handler, then dispatch() it on its associated executor
+template <typename Signature, typename T, typename ...Args>
+void dispatch(std::unique_ptr<Completion<Signature, T>>&& ptr, Args&& ...args)
+{
+ Completion<Signature, T>::dispatch(std::move(ptr), std::forward<Args>(args)...);
+}
+
+/// take ownership of the completion, bind any arguments to the completion
+/// handler, then post() it to its associated executor
+template <typename Signature, typename T, typename ...Args>
+void post(std::unique_ptr<Completion<Signature, T>>&& ptr, Args&& ...args)
+{
+ Completion<Signature, T>::post(std::move(ptr), std::forward<Args>(args)...);
+}
+
+} // namespace ceph::async
+
+#endif // CEPH_ASYNC_COMPLETION_H
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2018 Red Hat
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include "common/async/completion.h"
+#include <optional>
+#include <boost/intrusive/list.hpp>
+#include <gtest/gtest.h>
+
+namespace ceph::async {
+
+using boost::system::error_code;
+
+struct move_only {
+ move_only() = default;
+ move_only(move_only&&) = default;
+ move_only& operator=(move_only&&) = default;
+ move_only(const move_only&) = delete;
+ move_only& operator=(const move_only&) = delete;
+};
+
+TEST(AsyncCompletion, BindHandler)
+{
+ auto h1 = [] (int i, char c) {};
+ auto b1 = bind_handler(std::move(h1), 5, 'a');
+ b1();
+ const auto& c1 = b1;
+ c1();
+ std::move(b1)();
+
+ // move-only types can be forwarded with 'operator() &&'
+ auto h2 = [] (move_only&& m) {};
+ auto b2 = bind_handler(std::move(h2), move_only{});
+ std::move(b2)();
+
+ // references bound with std::ref() and passed to all operator() overloads
+ auto h3 = [] (int& c) { c++; };
+ int count = 0;
+ auto b3 = bind_handler(std::move(h3), std::ref(count));
+ EXPECT_EQ(0, count);
+ b3();
+ EXPECT_EQ(1, count);
+ const auto& c3 = b3;
+ c3();
+ EXPECT_EQ(2, count);
+ std::move(b3)();
+ EXPECT_EQ(3, count);
+}
+
+TEST(AsyncCompletion, MoveOnly)
+{
+ boost::asio::io_context context;
+ auto ex1 = context.get_executor();
+
+ std::optional<error_code> ec1, ec2;
+ {
+ // move-only user data
+ using Completion = Completion<void(error_code), move_only>;
+ auto c = Completion::create(ex1, [&ec1] (error_code ec) { ec1 = ec; });
+ Completion::post(std::move(c), boost::asio::error::operation_aborted);
+ EXPECT_FALSE(ec1);
+ }
+ {
+ // move-only handler
+ using Completion = Completion<void(error_code)>;
+ auto c = Completion::create(ex1, [&ec2, m=move_only{}] (error_code ec) { ec2 = ec; });
+ Completion::post(std::move(c), boost::asio::error::operation_aborted);
+ EXPECT_FALSE(ec2);
+ }
+
+ context.poll();
+ EXPECT_TRUE(context.stopped());
+
+ ASSERT_TRUE(ec1);
+ EXPECT_EQ(boost::asio::error::operation_aborted, *ec1);
+ ASSERT_TRUE(ec2);
+ EXPECT_EQ(boost::asio::error::operation_aborted, *ec2);
+}
+
+TEST(AsyncCompletion, VoidCompletion)
+{
+ boost::asio::io_context context;
+ auto ex1 = context.get_executor();
+
+ using Completion = Completion<void(error_code)>;
+ std::optional<error_code> ec1;
+
+ auto c = Completion::create(ex1, [&ec1] (error_code ec) { ec1 = ec; });
+ Completion::post(std::move(c), boost::asio::error::operation_aborted);
+
+ EXPECT_FALSE(ec1);
+
+ context.poll();
+ EXPECT_TRUE(context.stopped());
+
+ ASSERT_TRUE(ec1);
+ EXPECT_EQ(boost::asio::error::operation_aborted, *ec1);
+}
+
+TEST(AsyncCompletion, CompletionList)
+{
+ boost::asio::io_context context;
+ auto ex1 = context.get_executor();
+
+ using T = AsBase<boost::intrusive::list_base_hook<>>;
+ using Completion = Completion<void(), T>;
+ boost::intrusive::list<Completion> completions;
+ int completed = 0;
+ for (int i = 0; i < 3; i++) {
+ auto c = Completion::create(ex1, [&] { completed++; });
+ completions.push_back(*c.release());
+ }
+ completions.clear_and_dispose([] (Completion *c) {
+ Completion::post(std::unique_ptr<Completion>{c});
+ });
+
+ EXPECT_EQ(0, completed);
+
+ context.poll();
+ EXPECT_TRUE(context.stopped());
+
+ EXPECT_EQ(3, completed);
+}
+
+TEST(AsyncCompletion, CompletionPair)
+{
+ boost::asio::io_context context;
+ auto ex1 = context.get_executor();
+
+ using T = std::pair<int, std::string>;
+ using Completion = Completion<void(int, std::string), T>;
+
+ std::optional<T> t;
+ auto c = Completion::create(ex1, [&] (int first, std::string second) {
+ t = T{first, std::move(second)};
+ }, 2, "hello");
+
+ auto data = std::move(c->user_data);
+ Completion::post(std::move(c), data.first, std::move(data.second));
+
+ EXPECT_FALSE(t);
+
+ context.poll();
+ EXPECT_TRUE(context.stopped());
+
+ ASSERT_TRUE(t);
+ EXPECT_EQ(2, t->first);
+ EXPECT_EQ("hello", t->second);
+}
+
+TEST(AsyncCompletion, CompletionReference)
+{
+ boost::asio::io_context context;
+ auto ex1 = context.get_executor();
+
+ using Completion = Completion<void(int&)>;
+
+ auto c = Completion::create(ex1, [] (int& i) { ++i; });
+
+ int i = 42;
+ Completion::post(std::move(c), std::ref(i));
+
+ EXPECT_EQ(42, i);
+
+ context.poll();
+ EXPECT_TRUE(context.stopped());
+
+ EXPECT_EQ(43, i);
+}
+
+struct throws_on_move {
+ throws_on_move() = default;
+ throws_on_move(throws_on_move&&) {
+ throw std::runtime_error("oops");
+ }
+ throws_on_move& operator=(throws_on_move&&) {
+ throw std::runtime_error("oops");
+ }
+ throws_on_move(const throws_on_move&) = default;
+ throws_on_move& operator=(const throws_on_move&) = default;
+};
+
+TEST(AsyncCompletion, ThrowOnCtor)
+{
+ boost::asio::io_context context;
+ auto ex1 = context.get_executor();
+ {
+ using Completion = Completion<void(int&)>;
+
+ // throw on Handler move construction
+ EXPECT_THROW(Completion::create(ex1, [t=throws_on_move{}] (int& i) { ++i; }),
+ std::runtime_error);
+ }
+ {
+ using T = throws_on_move;
+ using Completion = Completion<void(int&), T>;
+
+ // throw on UserData construction
+ EXPECT_THROW(Completion::create(ex1, [] (int& i) { ++i; }, throws_on_move{}),
+ std::runtime_error);
+ }
+}
+
+TEST(AsyncCompletion, FreeFunctions)
+{
+ boost::asio::io_context context;
+ auto ex1 = context.get_executor();
+
+ auto c1 = create_completion<void(), void>(ex1, [] {});
+ post(std::move(c1));
+
+ auto c2 = create_completion<void(int), int>(ex1, [] (int) {}, 5);
+ defer(std::move(c2), c2->user_data);
+
+ context.poll();
+ EXPECT_TRUE(context.stopped());
+}
+
+} // namespace ceph::async