From: Casey Bodley Date: Sat, 17 Mar 2018 03:25:55 +0000 (-0400) Subject: common: add a generic async Completion interface X-Git-Tag: v14.0.0~173^2~2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=4b7ca701c7ca34e0b0b675ee468a59dbde70f061;p=ceph.git common: add a generic async Completion interface a generic Completion for storing asio completion handlers for deferred execution Signed-off-by: Casey Bodley --- diff --git a/src/common/async/bind_handler.h b/src/common/async/bind_handler.h new file mode 100644 index 000000000000..516d8a5e8b41 --- /dev/null +++ b/src/common/async/bind_handler.h @@ -0,0 +1,111 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2018 Red Hat + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef CEPH_ASYNC_BIND_HANDLER_H +#define CEPH_ASYNC_BIND_HANDLER_H + +#include +#include + +namespace ceph::async { + +/** + * A bound completion handler for use with boost::asio. + * + * A completion handler wrapper that allows a tuple of arguments to be forwarded + * to the original Handler. This is intended for use with boost::asio functions + * like defer(), dispatch() and post() which expect handlers which are callable + * with no arguments. + * + * The original Handler's associated allocator and executor are maintained. + * + * @see bind_handler + */ +template +struct CompletionHandler { + Handler handler; + Tuple args; + + CompletionHandler(Handler&& handler, Tuple&& args) + : handler(std::move(handler)), + args(std::move(args)) + {} + + void operator()() & { + std::apply(handler, args); + } + void operator()() const & { + std::apply(handler, args); + } + void operator()() && { + std::apply(std::move(handler), std::move(args)); + } + + using allocator_type = boost::asio::associated_allocator_t; + allocator_type get_allocator() const noexcept { + return boost::asio::get_associated_allocator(handler); + } +}; + +} // namespace ceph::async + +namespace boost::asio { + +// specialize boost::asio::associated_executor<> for CompletionHandler +template +struct associated_executor, Executor> { + using type = boost::asio::associated_executor_t; + + static type get(const ceph::async::CompletionHandler& handler, + const Executor& ex = Executor()) noexcept { + return boost::asio::get_associated_executor(handler.handler, ex); + } +}; + +} // namespace boost::asio + +namespace ceph::async { + +/** + * Returns a wrapped completion handler with bound arguments. + * + * Binds the given arguments to a handler, and returns a CompletionHandler that + * is callable with no arguments. This is similar to std::bind(), except that + * all arguments must be provided. Move-only argument types are supported as + * long as the CompletionHandler's 'operator() &&' overload is used, i.e. + * std::move(handler)(). + * + * Example use: + * + * // bind the arguments (5, "hello") to a callback lambda: + * auto callback = [] (int a, std::string b) {}; + * auto handler = bind_handler(callback, 5, "hello"); + * + * // execute the bound handler on an io_context: + * boost::asio::io_context context; + * boost::asio::post(context, std::move(handler)); + * context.run(); + * + * @see CompletionHandler + */ +template +auto bind_handler(Handler&& h, Args&& ...args) +{ + return CompletionHandler{std::forward(h), + std::make_tuple(std::forward(args)...)}; +} + +} // namespace ceph::async + +#endif // CEPH_ASYNC_BIND_HANDLER_H diff --git a/src/common/async/completion.h b/src/common/async/completion.h new file mode 100644 index 000000000000..cab7eaccfad4 --- /dev/null +++ b/src/common/async/completion.h @@ -0,0 +1,315 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2018 Red Hat + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef CEPH_ASYNC_COMPLETION_H +#define CEPH_ASYNC_COMPLETION_H + +#include + +#include "bind_handler.h" + +namespace ceph::async { + +/** + * Abstract completion handler interface for use with boost::asio. + * + * Memory management is performed using the Handler's 'associated allocator', + * which carries the additional requirement that its memory be released before + * the Handler is invoked. This allows memory allocated for one asynchronous + * operation to be reused in its continuation. Because of this requirement, any + * calls to invoke the completion must first release ownership of it. To enforce + * this, the static functions defer()/dispatch()/post() take the completion by + * rvalue-reference to std::unique_ptr, i.e. std::move(completion). + * + * Handlers may also have an 'associated executor', so the calls to defer(), + * dispatch(), and post() are forwarded to that executor. If there is no + * associated executor (which is generally the case unless one was bound with + * boost::asio::bind_executor()), the executor passed to Completion::create() + * is used as a default. + * + * Example use: + * + * // declare a Completion type with Signature = void(int, string) + * using MyCompletion = ceph::async::Completion; + * + * // create a completion with the given callback: + * std::unique_ptr c; + * c = MyCompletion::create(ex, [] (int a, const string& b) {}); + * + * // bind arguments to the callback and post to its associated executor: + * MyCompletion::post(std::move(c), 5, "hello"); + * + * + * Additional user data may be stored along with the Completion to take + * advantage of the handler allocator optimization. This is accomplished by + * specifying its type in the template parameter T. For example, the type + * Completion contains a public member variable 'int user_data'. + * Any additional arguments to Completion::create() will be forwarded to type + * T's constructor. + * + * If the AsBase type tag is used, as in Completion>, + * the Completion will inherit from T instead of declaring it as a member + * variable. + * + * When invoking the completion handler via defer(), dispatch(), or post(), + * care must be taken when passing arguments that refer to user data, because + * its memory is destroyed prior to invocation. In such cases, the user data + * should be moved/copied out of the Completion first. + */ +template +class Completion; + + +/// type tag for UserData +template struct AsBase {}; + +namespace detail { + +/// optional user data to be stored with the Completion +template +struct UserData { + T user_data; + template + UserData(Args&& ...args) + : user_data(std::forward(args)...) + {} +}; +// AsBase specialization inherits from T +template +struct UserData> : public T { + template + UserData(Args&& ...args) + : T(std::forward(args)...) + {} +}; +// void specialization +template <> +class UserData {}; + +} // namespace detail + + +// template specialization to pull the Signature's args apart +template +class Completion : public detail::UserData { + protected: + // internal interfaces for type-erasure on the Handler/Executor. uses + // tuple to provide perfect forwarding because you can't make + // virtual function templates + virtual void destroy_defer(std::tuple&& args) = 0; + virtual void destroy_dispatch(std::tuple&& args) = 0; + virtual void destroy_post(std::tuple&& args) = 0; + virtual void destroy() = 0; + + // constructor is protected, use create(). any constructor arguments are + // forwarded to UserData + template + Completion(TArgs&& ...args) + : detail::UserData(std::forward(args)...) + {} + public: + virtual ~Completion() = default; + + // use the virtual destroy() interface on delete. this allows the derived + // class to manage its memory using Handler allocators, without having to use + // a custom Deleter for std::unique_ptr<> + static void operator delete(void *p) { + static_cast(p)->destroy(); + } + + /// completion factory function that uses the handler's associated allocator. + /// any additional arguments are forwared to T's constructor + template + static std::unique_ptr + create(const Executor1& ex1, Handler&& handler, TArgs&& ...args); + + /// take ownership of the completion, bind any arguments to the completion + /// handler, then defer() it on its associated executor + template + static void defer(std::unique_ptr&& c, Args2&&...args); + + /// take ownership of the completion, bind any arguments to the completion + /// handler, then dispatch() it on its associated executor + template + static void dispatch(std::unique_ptr&& c, Args2&&...args); + + /// take ownership of the completion, bind any arguments to the completion + /// handler, then post() it to its associated executor + template + static void post(std::unique_ptr&& c, Args2&&...args); +}; + +namespace detail { + +// concrete Completion that knows how to invoke the completion handler. this +// observes all of the 'Requirements on asynchronous operations' specified by +// the C++ Networking TS +template +class CompletionImpl final : public Completion { + // use Handler's associated executor (or Executor1 by default) for callbacks + using Executor2 = boost::asio::associated_executor_t; + // maintain work on both executors + using Work1 = boost::asio::executor_work_guard; + using Work2 = boost::asio::executor_work_guard; + std::pair work; + Handler handler; + + // use Handler's associated allocator + using Alloc2 = boost::asio::associated_allocator_t; + using Traits2 = std::allocator_traits; + using RebindAlloc2 = typename Traits2::template rebind_alloc; + using RebindTraits2 = std::allocator_traits; + + // placement new for the handler allocator + static void* operator new(size_t, RebindAlloc2 alloc2) { + return RebindTraits2::allocate(alloc2, 1); + } + // placement delete for when the constructor throws during placement new + static void operator delete(void *p, RebindAlloc2 alloc2) { + RebindTraits2::deallocate(alloc2, static_cast(p), 1); + } + + void destroy_defer(std::tuple&& args) override { + auto w = std::move(work); + auto f = CompletionHandler{std::move(handler), std::move(args)}; + RebindAlloc2 alloc2 = boost::asio::get_associated_allocator(handler); + RebindTraits2::destroy(alloc2, this); + RebindTraits2::deallocate(alloc2, this, 1); + w.second.get_executor().defer(std::move(f), alloc2); + } + void destroy_dispatch(std::tuple&& args) override { + auto w = std::move(work); + auto f = CompletionHandler{std::move(handler), std::move(args)}; + RebindAlloc2 alloc2 = boost::asio::get_associated_allocator(handler); + RebindTraits2::destroy(alloc2, this); + RebindTraits2::deallocate(alloc2, this, 1); + w.second.get_executor().dispatch(std::move(f), alloc2); + } + void destroy_post(std::tuple&& args) override { + auto w = std::move(work); + auto f = CompletionHandler{std::move(handler), std::move(args)}; + RebindAlloc2 alloc2 = boost::asio::get_associated_allocator(handler); + RebindTraits2::destroy(alloc2, this); + RebindTraits2::deallocate(alloc2, this, 1); + w.second.get_executor().post(std::move(f), alloc2); + } + void destroy() override { + RebindAlloc2 alloc2 = boost::asio::get_associated_allocator(handler); + RebindTraits2::destroy(alloc2, this); + RebindTraits2::deallocate(alloc2, this, 1); + } + + // constructor is private, use create(). extra constructor arguments are + // forwarded to UserData + template + CompletionImpl(const Executor1& ex1, Handler&& handler, TArgs&& ...args) + : Completion(std::forward(args)...), + work(ex1, boost::asio::make_work_guard(handler, ex1)), + handler(std::move(handler)) + {} + + public: + template + static auto create(const Executor1& ex, Handler&& handler, TArgs&& ...args) { + auto alloc2 = boost::asio::get_associated_allocator(handler); + using Ptr = std::unique_ptr; + return Ptr{new (alloc2) CompletionImpl(ex, std::move(handler), + std::forward(args)...)}; + } + + static void operator delete(void *p) { + static_cast(p)->destroy(); + } +}; + +} // namespace detail + + +template +template +std::unique_ptr> +Completion::create(const Executor1& ex, + Handler&& handler, TArgs&& ...args) +{ + using Impl = detail::CompletionImpl; + return Impl::create(ex, std::forward(handler), + std::forward(args)...); +} + +template +template +void Completion::defer(std::unique_ptr&& ptr, + Args2&& ...args) +{ + auto c = ptr.release(); + c->destroy_defer(std::make_tuple(std::forward(args)...)); +} + +template +template +void Completion::dispatch(std::unique_ptr&& ptr, + Args2&& ...args) +{ + auto c = ptr.release(); + c->destroy_dispatch(std::make_tuple(std::forward(args)...)); +} + +template +template +void Completion::post(std::unique_ptr&& ptr, + Args2&& ...args) +{ + auto c = ptr.release(); + c->destroy_post(std::make_tuple(std::forward(args)...)); +} + + +/// completion factory function that uses the handler's associated allocator. +/// any additional arguments are forwared to T's constructor +template +std::unique_ptr> +create_completion(const Executor1& ex, Handler&& handler, TArgs&& ...args) +{ + return Completion::create(ex, std::forward(handler), + std::forward(args)...); +} + +/// take ownership of the completion, bind any arguments to the completion +/// handler, then defer() it on its associated executor +template +void defer(std::unique_ptr>&& ptr, Args&& ...args) +{ + Completion::defer(std::move(ptr), std::forward(args)...); +} + +/// take ownership of the completion, bind any arguments to the completion +/// handler, then dispatch() it on its associated executor +template +void dispatch(std::unique_ptr>&& ptr, Args&& ...args) +{ + Completion::dispatch(std::move(ptr), std::forward(args)...); +} + +/// take ownership of the completion, bind any arguments to the completion +/// handler, then post() it to its associated executor +template +void post(std::unique_ptr>&& ptr, Args&& ...args) +{ + Completion::post(std::move(ptr), std::forward(args)...); +} + +} // namespace ceph::async + +#endif // CEPH_ASYNC_COMPLETION_H diff --git a/src/test/common/CMakeLists.txt b/src/test/common/CMakeLists.txt index 702cb647d2c7..d68cfa2e5565 100644 --- a/src/test/common/CMakeLists.txt +++ b/src/test/common/CMakeLists.txt @@ -290,3 +290,7 @@ add_executable(unittest_hobject test_hobject.cc $) target_link_libraries(unittest_hobject global ceph-common) add_ceph_unittest(unittest_hobject) + +add_executable(unittest_async_completion test_async_completion.cc) +add_ceph_unittest(unittest_async_completion) +target_link_libraries(unittest_async_completion Boost::system) diff --git a/src/test/common/test_async_completion.cc b/src/test/common/test_async_completion.cc new file mode 100644 index 000000000000..d490744ca5bc --- /dev/null +++ b/src/test/common/test_async_completion.cc @@ -0,0 +1,229 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2018 Red Hat + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include "common/async/completion.h" +#include +#include +#include + +namespace ceph::async { + +using boost::system::error_code; + +struct move_only { + move_only() = default; + move_only(move_only&&) = default; + move_only& operator=(move_only&&) = default; + move_only(const move_only&) = delete; + move_only& operator=(const move_only&) = delete; +}; + +TEST(AsyncCompletion, BindHandler) +{ + auto h1 = [] (int i, char c) {}; + auto b1 = bind_handler(std::move(h1), 5, 'a'); + b1(); + const auto& c1 = b1; + c1(); + std::move(b1)(); + + // move-only types can be forwarded with 'operator() &&' + auto h2 = [] (move_only&& m) {}; + auto b2 = bind_handler(std::move(h2), move_only{}); + std::move(b2)(); + + // references bound with std::ref() and passed to all operator() overloads + auto h3 = [] (int& c) { c++; }; + int count = 0; + auto b3 = bind_handler(std::move(h3), std::ref(count)); + EXPECT_EQ(0, count); + b3(); + EXPECT_EQ(1, count); + const auto& c3 = b3; + c3(); + EXPECT_EQ(2, count); + std::move(b3)(); + EXPECT_EQ(3, count); +} + +TEST(AsyncCompletion, MoveOnly) +{ + boost::asio::io_context context; + auto ex1 = context.get_executor(); + + std::optional ec1, ec2; + { + // move-only user data + using Completion = Completion; + auto c = Completion::create(ex1, [&ec1] (error_code ec) { ec1 = ec; }); + Completion::post(std::move(c), boost::asio::error::operation_aborted); + EXPECT_FALSE(ec1); + } + { + // move-only handler + using Completion = Completion; + auto c = Completion::create(ex1, [&ec2, m=move_only{}] (error_code ec) { ec2 = ec; }); + Completion::post(std::move(c), boost::asio::error::operation_aborted); + EXPECT_FALSE(ec2); + } + + context.poll(); + EXPECT_TRUE(context.stopped()); + + ASSERT_TRUE(ec1); + EXPECT_EQ(boost::asio::error::operation_aborted, *ec1); + ASSERT_TRUE(ec2); + EXPECT_EQ(boost::asio::error::operation_aborted, *ec2); +} + +TEST(AsyncCompletion, VoidCompletion) +{ + boost::asio::io_context context; + auto ex1 = context.get_executor(); + + using Completion = Completion; + std::optional ec1; + + auto c = Completion::create(ex1, [&ec1] (error_code ec) { ec1 = ec; }); + Completion::post(std::move(c), boost::asio::error::operation_aborted); + + EXPECT_FALSE(ec1); + + context.poll(); + EXPECT_TRUE(context.stopped()); + + ASSERT_TRUE(ec1); + EXPECT_EQ(boost::asio::error::operation_aborted, *ec1); +} + +TEST(AsyncCompletion, CompletionList) +{ + boost::asio::io_context context; + auto ex1 = context.get_executor(); + + using T = AsBase>; + using Completion = Completion; + boost::intrusive::list completions; + int completed = 0; + for (int i = 0; i < 3; i++) { + auto c = Completion::create(ex1, [&] { completed++; }); + completions.push_back(*c.release()); + } + completions.clear_and_dispose([] (Completion *c) { + Completion::post(std::unique_ptr{c}); + }); + + EXPECT_EQ(0, completed); + + context.poll(); + EXPECT_TRUE(context.stopped()); + + EXPECT_EQ(3, completed); +} + +TEST(AsyncCompletion, CompletionPair) +{ + boost::asio::io_context context; + auto ex1 = context.get_executor(); + + using T = std::pair; + using Completion = Completion; + + std::optional t; + auto c = Completion::create(ex1, [&] (int first, std::string second) { + t = T{first, std::move(second)}; + }, 2, "hello"); + + auto data = std::move(c->user_data); + Completion::post(std::move(c), data.first, std::move(data.second)); + + EXPECT_FALSE(t); + + context.poll(); + EXPECT_TRUE(context.stopped()); + + ASSERT_TRUE(t); + EXPECT_EQ(2, t->first); + EXPECT_EQ("hello", t->second); +} + +TEST(AsyncCompletion, CompletionReference) +{ + boost::asio::io_context context; + auto ex1 = context.get_executor(); + + using Completion = Completion; + + auto c = Completion::create(ex1, [] (int& i) { ++i; }); + + int i = 42; + Completion::post(std::move(c), std::ref(i)); + + EXPECT_EQ(42, i); + + context.poll(); + EXPECT_TRUE(context.stopped()); + + EXPECT_EQ(43, i); +} + +struct throws_on_move { + throws_on_move() = default; + throws_on_move(throws_on_move&&) { + throw std::runtime_error("oops"); + } + throws_on_move& operator=(throws_on_move&&) { + throw std::runtime_error("oops"); + } + throws_on_move(const throws_on_move&) = default; + throws_on_move& operator=(const throws_on_move&) = default; +}; + +TEST(AsyncCompletion, ThrowOnCtor) +{ + boost::asio::io_context context; + auto ex1 = context.get_executor(); + { + using Completion = Completion; + + // throw on Handler move construction + EXPECT_THROW(Completion::create(ex1, [t=throws_on_move{}] (int& i) { ++i; }), + std::runtime_error); + } + { + using T = throws_on_move; + using Completion = Completion; + + // throw on UserData construction + EXPECT_THROW(Completion::create(ex1, [] (int& i) { ++i; }, throws_on_move{}), + std::runtime_error); + } +} + +TEST(AsyncCompletion, FreeFunctions) +{ + boost::asio::io_context context; + auto ex1 = context.get_executor(); + + auto c1 = create_completion(ex1, [] {}); + post(std::move(c1)); + + auto c2 = create_completion(ex1, [] (int) {}, 5); + defer(std::move(c2), c2->user_data); + + context.poll(); + EXPECT_TRUE(context.stopped()); +} + +} // namespace ceph::async