]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
librados: use ceph::async::Completion for asio bindings 21920/head
authorCasey Bodley <cbodley@redhat.com>
Fri, 4 May 2018 14:52:04 +0000 (10:52 -0400)
committerCasey Bodley <cbodley@redhat.com>
Wed, 9 May 2018 20:35:20 +0000 (16:35 -0400)
Signed-off-by: Casey Bodley <cbodley@redhat.com>
src/librados/librados_asio.h

index 4764791c8a3792f64420c3685c14cff06325bf36..3ddc1972c7c38e499f28409693f645ebf0cf1ed0 100644 (file)
@@ -14,9 +14,8 @@
 #ifndef LIBRADOS_ASIO_H
 #define LIBRADOS_ASIO_H
 
-#include <memory>
-#include <boost/asio.hpp>
 #include "include/rados/librados.hpp"
+#include "common/async/completion.h"
 
 /// Defines asynchronous librados operations that satisfy all of the
 /// "Requirements on asynchronous operations" imposed by the C++ Networking TS
@@ -34,256 +33,154 @@ namespace librados {
 namespace detail {
 
 /// unique_ptr with custom deleter for AioCompletion
-struct release_completion {
+struct AioCompletionDeleter {
   void operator()(AioCompletion *c) { c->release(); }
 };
-using unique_completion_ptr =
-    std::unique_ptr<AioCompletion, release_completion>;
+using unique_aio_completion_ptr =
+    std::unique_ptr<AioCompletion, AioCompletionDeleter>;
 
 /// Invokes the given completion handler. When the type of Result is not void,
 /// storage is provided for it and that result is passed as an additional
 /// argument to the handler.
 template <typename Result>
-struct invoker {
+struct Invoker {
+  using Signature = void(boost::system::error_code, Result);
   Result result;
-  template <typename CompletionHandler>
-  void invoke(CompletionHandler& completion_handler,
-              boost::system::error_code ec) {
-    completion_handler(ec, std::move(result));
+  template <typename Completion>
+  void dispatch(Completion&& completion, boost::system::error_code ec) {
+    ceph::async::dispatch(std::move(completion), ec, std::move(result));
   }
 };
 // specialization for Result=void
 template <>
-struct invoker<void> {
-  template <typename CompletionHandler>
-  void invoke(CompletionHandler& completion_handler,
-              boost::system::error_code ec) {
-    completion_handler(ec);
+struct Invoker<void> {
+  using Signature = void(boost::system::error_code);
+  template <typename Completion>
+  void dispatch(Completion&& completion, boost::system::error_code ec) {
+    ceph::async::dispatch(std::move(completion), ec);
   }
 };
 
-/// The function object that eventually gets dispatched back to its associated
-/// executor to invoke the completion_handler with our bound error_code and
-/// Result arguments.
-/// Inherits from invoker for empty base optimization when Result=void.
-template <typename CompletionHandler, typename Result, typename Executor2>
-struct bound_completion_handler : public invoker<Result> {
-  CompletionHandler completion_handler; //< upcall handler from CompletionToken
-  boost::system::error_code ec;
-  Executor2 ex2; //< associated completion handler executor
-
-  bound_completion_handler(CompletionHandler& completion_handler, Executor2 ex2)
-    : completion_handler(completion_handler), ex2(ex2)
-  {
-    // static check for CompletionHandler concept (must be CopyConstructible and
-    // callable with no arguments)
-    using namespace boost::asio;
-    BOOST_ASIO_LEGACY_COMPLETION_HANDLER_CHECK(bound_completion_handler, *this) type_check;
-  }
-
-  /// Invoke the completion handler with our bound arguments
-  void operator()() {
-    this->invoke(completion_handler, ec);
-  }
-
-  /// Delegate to CompletionHandler's associated allocator
-  using allocator_type = boost::asio::associated_allocator_t<CompletionHandler>;
-  allocator_type get_allocator() const noexcept {
-    return boost::asio::get_associated_allocator(completion_handler);
+template <typename Result>
+struct AsyncOp : Invoker<Result> {
+  unique_aio_completion_ptr aio_completion;
+
+  using Signature = typename Invoker<Result>::Signature;
+  using Completion = ceph::async::Completion<Signature, AsyncOp<Result>>;
+
+  static void aio_dispatch(completion_t cb, void *arg) {
+    // reclaim ownership of the completion
+    auto p = std::unique_ptr<Completion>{static_cast<Completion*>(arg)};
+    // move result out of Completion memory being freed
+    auto op = std::move(p->user_data);
+    const int ret = op.aio_completion->get_return_value();
+    boost::system::error_code ec;
+    if (ret < 0) {
+      ec.assign(-ret, boost::system::system_category());
+    }
+    op.dispatch(std::move(p), ec);
   }
 
-  /// Use our associated completion handler executor
-  using executor_type = Executor2;
-  executor_type get_executor() const noexcept {
-    return ex2;
+  template <typename Executor1, typename CompletionHandler>
+  static auto create(const Executor1& ex1, CompletionHandler&& handler) {
+    auto p = Completion::create(ex1, std::move(handler));
+    p->user_data.aio_completion.reset(
+        Rados::aio_create_completion(p.get(), nullptr, aio_dispatch));
+    return p;
   }
 };
 
-/// Operation state needed to invoke the handler on completion. This state must
-/// be allocated so that its address can be passed through the AioCompletion
-/// callback. This memory is managed by the CompletionHandler's associated
-/// allocator according to "Allocation of intermediate storage" requirements.
-template <typename CompletionHandler, typename Result, typename Executor1>
-struct op_state {
-  /// completion handler executor, which delegates to CompletionHandler's
-  /// associated executor or defaults to the io executor
-  using Executor2 = boost::asio::associated_executor_t<CompletionHandler, Executor1>;
-
-  /// maintain outstanding work on the io executor
-  boost::asio::executor_work_guard<Executor1> work1;
-  /// maintain outstanding work on the completion handler executor
-  boost::asio::executor_work_guard<Executor2> work2;
-
-  /// the function object that invokes the completion handler
-  bound_completion_handler<CompletionHandler, Result, Executor2> f;
-  unique_completion_ptr completion; //< the AioCompletion
-
-  op_state(CompletionHandler& completion_handler, Executor1 ex1,
-           unique_completion_ptr&& completion)
-    : work1(ex1),
-      work2(boost::asio::get_associated_executor(completion_handler, ex1)),
-      f(completion_handler, work2.get_executor()),
-      completion(std::move(completion))
-  {}
-
-  using Handler = CompletionHandler; // the following macro wants a Handler type
-
-  /// Defines a scoped 'op_state::ptr' type that uses CompletionHandler's
-  /// associated allocator to manage its memory
-  BOOST_ASIO_DEFINE_HANDLER_PTR(op_state);
-};
-
-/// Handler allocators require that their memory is released before the handler
-/// itself is invoked. Return a moved copy of the bound completion handler after
-/// destroying/deallocating the op_state.
-template <typename StatePtr>
-auto release_handler(StatePtr&& p) -> decltype(p.p->f)
-{
-  // move the completion handler out of the memory being released
-  auto f = std::move(p.p->f);
-  // return the memory to the moved handler's associated allocator
-  p.h = std::addressof(f.completion_handler);
-  p.reset();
-  return f;
-}
-
-/// AioCompletion callback function, executed in the librados finisher thread.
-template <typename State, typename StatePtr = typename State::ptr>
-inline void aio_op_dispatch(completion_t cb, void *arg)
-{
-  auto op = static_cast<State*>(arg);
-  const int ret = op->completion->get_return_value();
-  // maintain work until the completion handler is dispatched. these would
-  // otherwise be destroyed with op_state in release_handler()
-  auto work1 = std::move(op->work1);
-  auto work2 = std::move(op->work2);
-  // return the memory to the handler allocator
-  auto f = release_handler<StatePtr>({nullptr, op, op});
-  if (ret < 0) {
-    // assign the bound error code
-    f.ec.assign(-ret, boost::system::system_category());
-  }
-  // dispatch the completion handler using its associated allocator/executor
-  auto alloc2 = boost::asio::get_associated_allocator(f);
-  f.ex2.dispatch(std::move(f), alloc2);
-}
-
-/// Create an AioCompletion and return it as a unique_ptr.
-template <typename State>
-inline unique_completion_ptr make_completion(void *op)
-{
-  auto cb = aio_op_dispatch<State>;
-  return unique_completion_ptr{Rados::aio_create_completion(op, nullptr, cb)};
-}
-
-/// Allocate op state using the CompletionHandler's associated allocator.
-template <typename Result, typename Executor1, typename CompletionHandler,
-          typename State = op_state<CompletionHandler, Result, Executor1>,
-          typename StatePtr = typename State::ptr>
-StatePtr make_op_state(Executor1&& ex1, CompletionHandler& handler)
-{
-  // allocate a block of memory with StatePtr::allocate()
-  StatePtr p = {std::addressof(handler), StatePtr::allocate(handler), 0};
-  // create an AioCompletion to call aio_op_dispatch() with this pointer
-  auto completion = make_completion<State>(p.v);
-  // construct the op_state in place
-  p.p = new (p.v) State(handler, ex1, std::move(completion));
-  return p;
-}
-
 } // namespace detail
 
 
 /// Calls IoCtx::aio_read() and arranges for the AioCompletion to call a
 /// given handler with signature (boost::system::error_code, bufferlist).
-template <typename ExecutionContext, typename CompletionToken,
-          typename Signature = void(boost::system::error_code, bufferlist)>
-BOOST_ASIO_INITFN_RESULT_TYPE(CompletionToken, Signature)
-async_read(ExecutionContext& ctx, IoCtx& io, const std::string& oid,
-           size_t len, uint64_t off, CompletionToken&& token)
+template <typename ExecutionContext, typename CompletionToken>
+auto async_read(ExecutionContext& ctx, IoCtx& io, const std::string& oid,
+                size_t len, uint64_t off, CompletionToken&& token)
 {
+  using Op = detail::AsyncOp<bufferlist>;
+  using Signature = typename Op::Signature;
   boost::asio::async_completion<CompletionToken, Signature> init(token);
-  auto p = detail::make_op_state<bufferlist>(ctx.get_executor(),
-                                             init.completion_handler);
+  auto p = Op::create(ctx.get_executor(), init.completion_handler);
+  auto& op = p->user_data;
 
-  int ret = io.aio_read(oid, p.p->completion.get(),
-                        &p.p->f.result, len, off);
+  int ret = io.aio_read(oid, op.aio_completion.get(), &op.result, len, off);
   if (ret < 0) {
-    // post the completion after releasing the handler-allocated memory
-    p.p->f.ec.assign(-ret, boost::system::system_category());
-    boost::asio::post(detail::release_handler(std::move(p)));
+    auto ec = boost::system::error_code{-ret, boost::system::system_category()};
+    ceph::async::post(std::move(p), ec, bufferlist{});
   } else {
-    p.v = p.p = nullptr; // release ownership until completion
+    p.release(); // release ownership until completion
   }
   return init.result.get();
 }
 
 /// Calls IoCtx::aio_write() and arranges for the AioCompletion to call a
 /// given handler with signature (boost::system::error_code).
-template <typename ExecutionContext, typename CompletionToken,
-          typename Signature = void(boost::system::error_code)>
-BOOST_ASIO_INITFN_RESULT_TYPE(CompletionToken, Signature)
-async_write(ExecutionContext& ctx, IoCtx& io, const std::string& oid,
-            bufferlist &bl, size_t len, uint64_t off,
-            CompletionToken&& token)
+template <typename ExecutionContext, typename CompletionToken>
+auto async_write(ExecutionContext& ctx, IoCtx& io, const std::string& oid,
+                 bufferlist &bl, size_t len, uint64_t off,
+                 CompletionToken&& token)
 {
+  using Op = detail::AsyncOp<void>;
+  using Signature = typename Op::Signature;
   boost::asio::async_completion<CompletionToken, Signature> init(token);
-  auto p = detail::make_op_state<void>(ctx.get_executor(),
-                                       init.completion_handler);
+  auto p = Op::create(ctx.get_executor(), init.completion_handler);
+  auto& op = p->user_data;
 
-  int ret = io.aio_write(oid, p.p->completion.get(), bl, len, off);
+  int ret = io.aio_write(oid, op.aio_completion.get(), bl, len, off);
   if (ret < 0) {
-    p.p->f.ec.assign(-ret, boost::system::system_category());
-    boost::asio::post(detail::release_handler(std::move(p)));
+    auto ec = boost::system::error_code{-ret, boost::system::system_category()};
+    ceph::async::post(std::move(p), ec);
   } else {
-    p.v = p.p = nullptr; // release ownership until completion
+    p.release(); // release ownership until completion
   }
   return init.result.get();
 }
 
 /// Calls IoCtx::aio_operate() and arranges for the AioCompletion to call a
 /// given handler with signature (boost::system::error_code, bufferlist).
-template <typename ExecutionContext, typename CompletionToken,
-          typename Signature = void(boost::system::error_code, bufferlist)>
-BOOST_ASIO_INITFN_RESULT_TYPE(CompletionToken, Signature)
-async_operate(ExecutionContext& ctx, IoCtx& io, const std::string& oid,
-              ObjectReadOperation *op, int flags,
-              CompletionToken&& token)
+template <typename ExecutionContext, typename CompletionToken>
+auto async_operate(ExecutionContext& ctx, IoCtx& io, const std::string& oid,
+                   ObjectReadOperation *read_op, int flags,
+                   CompletionToken&& token)
 {
+  using Op = detail::AsyncOp<bufferlist>;
+  using Signature = typename Op::Signature;
   boost::asio::async_completion<CompletionToken, Signature> init(token);
-  auto p = detail::make_op_state<bufferlist>(ctx.get_executor(),
-                                             init.completion_handler);
+  auto p = Op::create(ctx.get_executor(), init.completion_handler);
+  auto& op = p->user_data;
 
-  int ret = io.aio_operate(oid, p.p->completion.get(), op,
-                           flags, &p.p->f.result);
+  int ret = io.aio_operate(oid, op.aio_completion.get(), read_op,
+                           flags, &op.result);
   if (ret < 0) {
-    p.p->f.ec.assign(-ret, boost::system::system_category());
-    boost::asio::post(detail::release_handler(std::move(p)));
+    auto ec = boost::system::error_code{-ret, boost::system::system_category()};
+    ceph::async::post(std::move(p), ec, bufferlist{});
   } else {
-    p.v = p.p = nullptr; // release ownership until completion
+    p.release(); // release ownership until completion
   }
   return init.result.get();
 }
 
 /// Calls IoCtx::aio_operate() and arranges for the AioCompletion to call a
 /// given handler with signature (boost::system::error_code).
-template <typename ExecutionContext, typename CompletionToken,
-          typename Signature = void(boost::system::error_code)>
-BOOST_ASIO_INITFN_RESULT_TYPE(CompletionToken, Signature)
-async_operate(ExecutionContext& ctx, IoCtx& io, const std::string& oid,
-              ObjectWriteOperation *op, int flags,
-              CompletionToken &&token)
+template <typename ExecutionContext, typename CompletionToken>
+auto async_operate(ExecutionContext& ctx, IoCtx& io, const std::string& oid,
+                   ObjectWriteOperation *write_op, int flags,
+                   CompletionToken &&token)
 {
+  using Op = detail::AsyncOp<void>;
+  using Signature = typename Op::Signature;
   boost::asio::async_completion<CompletionToken, Signature> init(token);
-  auto p = detail::make_op_state<void>(ctx.get_executor(),
-                                       init.completion_handler);
+  auto p = Op::create(ctx.get_executor(), init.completion_handler);
+  auto& op = p->user_data;
 
-  int ret = io.aio_operate(oid, p.p->completion.get(), op, flags);
+  int ret = io.aio_operate(oid, op.aio_completion.get(), write_op, flags);
   if (ret < 0) {
-    p.p->f.ec.assign(-ret, boost::system::system_category());
-    boost::asio::post(detail::release_handler(std::move(p)));
+    auto ec = boost::system::error_code{-ret, boost::system::system_category()};
+    ceph::async::post(std::move(p), ec);
   } else {
-    p.v = p.p = nullptr; // release ownership until completion
+    p.release(); // release ownership until completion
   }
   return init.result.get();
 }