From: Casey Bodley Date: Fri, 4 May 2018 14:52:04 +0000 (-0400) Subject: librados: use ceph::async::Completion for asio bindings X-Git-Tag: v14.0.1~1224^2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=237db74cfeaed28e2d1e2857807c60a5f88b8a97;p=ceph.git librados: use ceph::async::Completion for asio bindings Signed-off-by: Casey Bodley --- diff --git a/src/librados/librados_asio.h b/src/librados/librados_asio.h index 4764791c8a3..3ddc1972c7c 100644 --- a/src/librados/librados_asio.h +++ b/src/librados/librados_asio.h @@ -14,9 +14,8 @@ #ifndef LIBRADOS_ASIO_H #define LIBRADOS_ASIO_H -#include -#include #include "include/rados/librados.hpp" +#include "common/async/completion.h" /// Defines asynchronous librados operations that satisfy all of the /// "Requirements on asynchronous operations" imposed by the C++ Networking TS @@ -34,256 +33,154 @@ namespace librados { namespace detail { /// unique_ptr with custom deleter for AioCompletion -struct release_completion { +struct AioCompletionDeleter { void operator()(AioCompletion *c) { c->release(); } }; -using unique_completion_ptr = - std::unique_ptr; +using unique_aio_completion_ptr = + std::unique_ptr; /// Invokes the given completion handler. When the type of Result is not void, /// storage is provided for it and that result is passed as an additional /// argument to the handler. template -struct invoker { +struct Invoker { + using Signature = void(boost::system::error_code, Result); Result result; - template - void invoke(CompletionHandler& completion_handler, - boost::system::error_code ec) { - completion_handler(ec, std::move(result)); + template + void dispatch(Completion&& completion, boost::system::error_code ec) { + ceph::async::dispatch(std::move(completion), ec, std::move(result)); } }; // specialization for Result=void template <> -struct invoker { - template - void invoke(CompletionHandler& completion_handler, - boost::system::error_code ec) { - completion_handler(ec); +struct Invoker { + using Signature = void(boost::system::error_code); + template + void dispatch(Completion&& completion, boost::system::error_code ec) { + ceph::async::dispatch(std::move(completion), ec); } }; -/// The function object that eventually gets dispatched back to its associated -/// executor to invoke the completion_handler with our bound error_code and -/// Result arguments. -/// Inherits from invoker for empty base optimization when Result=void. -template -struct bound_completion_handler : public invoker { - CompletionHandler completion_handler; //< upcall handler from CompletionToken - boost::system::error_code ec; - Executor2 ex2; //< associated completion handler executor - - bound_completion_handler(CompletionHandler& completion_handler, Executor2 ex2) - : completion_handler(completion_handler), ex2(ex2) - { - // static check for CompletionHandler concept (must be CopyConstructible and - // callable with no arguments) - using namespace boost::asio; - BOOST_ASIO_LEGACY_COMPLETION_HANDLER_CHECK(bound_completion_handler, *this) type_check; - } - - /// Invoke the completion handler with our bound arguments - void operator()() { - this->invoke(completion_handler, ec); - } - - /// Delegate to CompletionHandler's associated allocator - using allocator_type = boost::asio::associated_allocator_t; - allocator_type get_allocator() const noexcept { - return boost::asio::get_associated_allocator(completion_handler); +template +struct AsyncOp : Invoker { + unique_aio_completion_ptr aio_completion; + + using Signature = typename Invoker::Signature; + using Completion = ceph::async::Completion>; + + static void aio_dispatch(completion_t cb, void *arg) { + // reclaim ownership of the completion + auto p = std::unique_ptr{static_cast(arg)}; + // move result out of Completion memory being freed + auto op = std::move(p->user_data); + const int ret = op.aio_completion->get_return_value(); + boost::system::error_code ec; + if (ret < 0) { + ec.assign(-ret, boost::system::system_category()); + } + op.dispatch(std::move(p), ec); } - /// Use our associated completion handler executor - using executor_type = Executor2; - executor_type get_executor() const noexcept { - return ex2; + template + static auto create(const Executor1& ex1, CompletionHandler&& handler) { + auto p = Completion::create(ex1, std::move(handler)); + p->user_data.aio_completion.reset( + Rados::aio_create_completion(p.get(), nullptr, aio_dispatch)); + return p; } }; -/// Operation state needed to invoke the handler on completion. This state must -/// be allocated so that its address can be passed through the AioCompletion -/// callback. This memory is managed by the CompletionHandler's associated -/// allocator according to "Allocation of intermediate storage" requirements. -template -struct op_state { - /// completion handler executor, which delegates to CompletionHandler's - /// associated executor or defaults to the io executor - using Executor2 = boost::asio::associated_executor_t; - - /// maintain outstanding work on the io executor - boost::asio::executor_work_guard work1; - /// maintain outstanding work on the completion handler executor - boost::asio::executor_work_guard work2; - - /// the function object that invokes the completion handler - bound_completion_handler f; - unique_completion_ptr completion; //< the AioCompletion - - op_state(CompletionHandler& completion_handler, Executor1 ex1, - unique_completion_ptr&& completion) - : work1(ex1), - work2(boost::asio::get_associated_executor(completion_handler, ex1)), - f(completion_handler, work2.get_executor()), - completion(std::move(completion)) - {} - - using Handler = CompletionHandler; // the following macro wants a Handler type - - /// Defines a scoped 'op_state::ptr' type that uses CompletionHandler's - /// associated allocator to manage its memory - BOOST_ASIO_DEFINE_HANDLER_PTR(op_state); -}; - -/// Handler allocators require that their memory is released before the handler -/// itself is invoked. Return a moved copy of the bound completion handler after -/// destroying/deallocating the op_state. -template -auto release_handler(StatePtr&& p) -> decltype(p.p->f) -{ - // move the completion handler out of the memory being released - auto f = std::move(p.p->f); - // return the memory to the moved handler's associated allocator - p.h = std::addressof(f.completion_handler); - p.reset(); - return f; -} - -/// AioCompletion callback function, executed in the librados finisher thread. -template -inline void aio_op_dispatch(completion_t cb, void *arg) -{ - auto op = static_cast(arg); - const int ret = op->completion->get_return_value(); - // maintain work until the completion handler is dispatched. these would - // otherwise be destroyed with op_state in release_handler() - auto work1 = std::move(op->work1); - auto work2 = std::move(op->work2); - // return the memory to the handler allocator - auto f = release_handler({nullptr, op, op}); - if (ret < 0) { - // assign the bound error code - f.ec.assign(-ret, boost::system::system_category()); - } - // dispatch the completion handler using its associated allocator/executor - auto alloc2 = boost::asio::get_associated_allocator(f); - f.ex2.dispatch(std::move(f), alloc2); -} - -/// Create an AioCompletion and return it as a unique_ptr. -template -inline unique_completion_ptr make_completion(void *op) -{ - auto cb = aio_op_dispatch; - return unique_completion_ptr{Rados::aio_create_completion(op, nullptr, cb)}; -} - -/// Allocate op state using the CompletionHandler's associated allocator. -template , - typename StatePtr = typename State::ptr> -StatePtr make_op_state(Executor1&& ex1, CompletionHandler& handler) -{ - // allocate a block of memory with StatePtr::allocate() - StatePtr p = {std::addressof(handler), StatePtr::allocate(handler), 0}; - // create an AioCompletion to call aio_op_dispatch() with this pointer - auto completion = make_completion(p.v); - // construct the op_state in place - p.p = new (p.v) State(handler, ex1, std::move(completion)); - return p; -} - } // namespace detail /// Calls IoCtx::aio_read() and arranges for the AioCompletion to call a /// given handler with signature (boost::system::error_code, bufferlist). -template -BOOST_ASIO_INITFN_RESULT_TYPE(CompletionToken, Signature) -async_read(ExecutionContext& ctx, IoCtx& io, const std::string& oid, - size_t len, uint64_t off, CompletionToken&& token) +template +auto async_read(ExecutionContext& ctx, IoCtx& io, const std::string& oid, + size_t len, uint64_t off, CompletionToken&& token) { + using Op = detail::AsyncOp; + using Signature = typename Op::Signature; boost::asio::async_completion init(token); - auto p = detail::make_op_state(ctx.get_executor(), - init.completion_handler); + auto p = Op::create(ctx.get_executor(), init.completion_handler); + auto& op = p->user_data; - int ret = io.aio_read(oid, p.p->completion.get(), - &p.p->f.result, len, off); + int ret = io.aio_read(oid, op.aio_completion.get(), &op.result, len, off); if (ret < 0) { - // post the completion after releasing the handler-allocated memory - p.p->f.ec.assign(-ret, boost::system::system_category()); - boost::asio::post(detail::release_handler(std::move(p))); + auto ec = boost::system::error_code{-ret, boost::system::system_category()}; + ceph::async::post(std::move(p), ec, bufferlist{}); } else { - p.v = p.p = nullptr; // release ownership until completion + p.release(); // release ownership until completion } return init.result.get(); } /// Calls IoCtx::aio_write() and arranges for the AioCompletion to call a /// given handler with signature (boost::system::error_code). -template -BOOST_ASIO_INITFN_RESULT_TYPE(CompletionToken, Signature) -async_write(ExecutionContext& ctx, IoCtx& io, const std::string& oid, - bufferlist &bl, size_t len, uint64_t off, - CompletionToken&& token) +template +auto async_write(ExecutionContext& ctx, IoCtx& io, const std::string& oid, + bufferlist &bl, size_t len, uint64_t off, + CompletionToken&& token) { + using Op = detail::AsyncOp; + using Signature = typename Op::Signature; boost::asio::async_completion init(token); - auto p = detail::make_op_state(ctx.get_executor(), - init.completion_handler); + auto p = Op::create(ctx.get_executor(), init.completion_handler); + auto& op = p->user_data; - int ret = io.aio_write(oid, p.p->completion.get(), bl, len, off); + int ret = io.aio_write(oid, op.aio_completion.get(), bl, len, off); if (ret < 0) { - p.p->f.ec.assign(-ret, boost::system::system_category()); - boost::asio::post(detail::release_handler(std::move(p))); + auto ec = boost::system::error_code{-ret, boost::system::system_category()}; + ceph::async::post(std::move(p), ec); } else { - p.v = p.p = nullptr; // release ownership until completion + p.release(); // release ownership until completion } return init.result.get(); } /// Calls IoCtx::aio_operate() and arranges for the AioCompletion to call a /// given handler with signature (boost::system::error_code, bufferlist). -template -BOOST_ASIO_INITFN_RESULT_TYPE(CompletionToken, Signature) -async_operate(ExecutionContext& ctx, IoCtx& io, const std::string& oid, - ObjectReadOperation *op, int flags, - CompletionToken&& token) +template +auto async_operate(ExecutionContext& ctx, IoCtx& io, const std::string& oid, + ObjectReadOperation *read_op, int flags, + CompletionToken&& token) { + using Op = detail::AsyncOp; + using Signature = typename Op::Signature; boost::asio::async_completion init(token); - auto p = detail::make_op_state(ctx.get_executor(), - init.completion_handler); + auto p = Op::create(ctx.get_executor(), init.completion_handler); + auto& op = p->user_data; - int ret = io.aio_operate(oid, p.p->completion.get(), op, - flags, &p.p->f.result); + int ret = io.aio_operate(oid, op.aio_completion.get(), read_op, + flags, &op.result); if (ret < 0) { - p.p->f.ec.assign(-ret, boost::system::system_category()); - boost::asio::post(detail::release_handler(std::move(p))); + auto ec = boost::system::error_code{-ret, boost::system::system_category()}; + ceph::async::post(std::move(p), ec, bufferlist{}); } else { - p.v = p.p = nullptr; // release ownership until completion + p.release(); // release ownership until completion } return init.result.get(); } /// Calls IoCtx::aio_operate() and arranges for the AioCompletion to call a /// given handler with signature (boost::system::error_code). -template -BOOST_ASIO_INITFN_RESULT_TYPE(CompletionToken, Signature) -async_operate(ExecutionContext& ctx, IoCtx& io, const std::string& oid, - ObjectWriteOperation *op, int flags, - CompletionToken &&token) +template +auto async_operate(ExecutionContext& ctx, IoCtx& io, const std::string& oid, + ObjectWriteOperation *write_op, int flags, + CompletionToken &&token) { + using Op = detail::AsyncOp; + using Signature = typename Op::Signature; boost::asio::async_completion init(token); - auto p = detail::make_op_state(ctx.get_executor(), - init.completion_handler); + auto p = Op::create(ctx.get_executor(), init.completion_handler); + auto& op = p->user_data; - int ret = io.aio_operate(oid, p.p->completion.get(), op, flags); + int ret = io.aio_operate(oid, op.aio_completion.get(), write_op, flags); if (ret < 0) { - p.p->f.ec.assign(-ret, boost::system::system_category()); - boost::asio::post(detail::release_handler(std::move(p))); + auto ec = boost::system::error_code{-ret, boost::system::system_category()}; + ceph::async::post(std::move(p), ec); } else { - p.v = p.p = nullptr; // release ownership until completion + p.release(); // release ownership until completion } return init.result.get(); }