--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2017 Red Hat, Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ */
+
+#ifndef LIBRADOS_ASIO_H
+#define LIBRADOS_ASIO_H
+
+#include <memory>
+#include <boost/asio.hpp>
+#include "include/rados/librados.hpp"
+
+/// Defines asynchronous librados operations that satisfy all of the
+/// "Requirements on asynchronous operations" imposed by the C++ Networking TS
+/// in section 13.2.7. Many of the type and variable names below are taken
+/// directly from those requirements.
+///
+/// The current draft of the Networking TS (as of 2017-11-27) is available here:
+/// http://www.open-std.org/jtc1/sc22/wg21/docs/papers/2017/n4711.pdf
+///
+/// The boost::asio documentation duplicates these requirements here:
+/// http://www.boost.org/doc/libs/1_66_0/doc/html/boost_asio/reference/asynchronous_operations.html
+
+namespace librados {
+
+namespace detail {
+
+/// unique_ptr with custom deleter for AioCompletion
+struct release_completion {
+ void operator()(AioCompletion *c) { c->release(); }
+};
+using unique_completion_ptr =
+ std::unique_ptr<AioCompletion, release_completion>;
+
+/// Invokes the given completion handler. When the type of Result is not void,
+/// storage is provided for it and that result is passed as an additional
+/// argument to the handler.
+template <typename Result>
+struct invoker {
+ Result result;
+ template <typename CompletionHandler>
+ void invoke(CompletionHandler& completion_handler,
+ boost::system::error_code ec) {
+ completion_handler(ec, std::move(result));
+ }
+};
+// specialization for Result=void
+template <>
+struct invoker<void> {
+ template <typename CompletionHandler>
+ void invoke(CompletionHandler& completion_handler,
+ boost::system::error_code ec) {
+ completion_handler(ec);
+ }
+};
+
+/// The function object that eventually gets dispatched back to its associated
+/// executor to invoke the completion_handler with our bound error_code and
+/// Result arguments.
+/// Inherits from invoker for empty base optimization when Result=void.
+template <typename CompletionHandler, typename Result, typename Executor2>
+struct bound_completion_handler : public invoker<Result> {
+ CompletionHandler completion_handler; //< upcall handler from CompletionToken
+ boost::system::error_code ec;
+ Executor2 ex2; //< associated completion handler executor
+
+ bound_completion_handler(CompletionHandler& completion_handler, Executor2 ex2)
+ : completion_handler(completion_handler), ex2(ex2)
+ {
+ // static check for CompletionHandler concept (must be CopyConstructible and
+ // callable with no arguments)
+ using namespace boost::asio;
+ BOOST_ASIO_COMPLETION_HANDLER_CHECK(bound_completion_handler, *this) type_check;
+ }
+
+ /// Invoke the completion handler with our bound arguments
+ void operator()() {
+ this->invoke(completion_handler, ec);
+ }
+
+ /// Delegate to CompletionHandler's associated allocator
+ using allocator_type = boost::asio::associated_allocator_t<CompletionHandler>;
+ allocator_type get_allocator() const noexcept {
+ return boost::asio::get_associated_allocator(completion_handler);
+ }
+
+ /// Use our associated completion handler executor
+ using executor_type = Executor2;
+ executor_type get_executor() const noexcept {
+ return ex2;
+ }
+};
+
+/// Operation state needed to invoke the handler on completion. This state must
+/// be allocated so that its address can be passed through the AioCompletion
+/// callback. This memory is managed by the CompletionHandler's associated
+/// allocator according to "Allocation of intermediate storage" requirements.
+template <typename CompletionHandler, typename Result, typename Executor1>
+struct op_state {
+ /// completion handler executor, which delegates to CompletionHandler's
+ /// associated executor or defaults to the io executor
+ using Executor2 = boost::asio::associated_executor_t<CompletionHandler, Executor1>;
+
+ /// maintain outstanding work on the io executor
+ boost::asio::executor_work_guard<Executor1> work1;
+ /// maintain outstanding work on the completion handler executor
+ boost::asio::executor_work_guard<Executor2> work2;
+
+ /// the function object that invokes the completion handler
+ bound_completion_handler<CompletionHandler, Result, Executor2> f;
+ unique_completion_ptr completion; //< the AioCompletion
+
+ op_state(CompletionHandler& completion_handler, Executor1 ex1,
+ unique_completion_ptr&& completion)
+ : work1(ex1),
+ work2(boost::asio::get_associated_executor(completion_handler, ex1)),
+ f(completion_handler, work2.get_executor()),
+ completion(std::move(completion))
+ {}
+
+ using Handler = CompletionHandler; // the following macro wants a Handler type
+
+ /// Defines a scoped 'op_state::ptr' type that uses CompletionHandler's
+ /// associated allocator to manage its memory
+ BOOST_ASIO_DEFINE_HANDLER_PTR(op_state);
+};
+
+/// Handler allocators require that their memory is released before the handler
+/// itself is invoked. Return a moved copy of the bound completion handler after
+/// destroying/deallocating the op_state.
+template <typename StatePtr>
+auto release_handler(StatePtr&& p) -> decltype(p.p->f)
+{
+ // move the completion handler out of the memory being released
+ auto f = std::move(p.p->f);
+ // return the memory to the moved handler's associated allocator
+ p.h = std::addressof(f.completion_handler);
+ p.reset();
+ return f;
+}
+
+/// AioCompletion callback function, executed in the librados finisher thread.
+template <typename State, typename StatePtr = typename State::ptr>
+inline void aio_op_dispatch(completion_t cb, void *arg)
+{
+ auto op = static_cast<State*>(arg);
+ const int ret = op->completion->get_return_value();
+ // maintain work until the completion handler is dispatched. these would
+ // otherwise be destroyed with op_state in release_handler()
+ auto work1 = std::move(op->work1);
+ auto work2 = std::move(op->work2);
+ // return the memory to the handler allocator
+ auto f = release_handler<StatePtr>({nullptr, op, op});
+ if (ret < 0) {
+ // assign the bound error code
+ f.ec.assign(-ret, boost::system::system_category());
+ }
+ // dispatch the completion handler using its associated allocator/executor
+ auto alloc2 = boost::asio::get_associated_allocator(f);
+ f.ex2.dispatch(std::move(f), alloc2);
+}
+
+/// Create an AioCompletion and return it as a unique_ptr.
+template <typename State>
+inline unique_completion_ptr make_completion(void *op)
+{
+ auto cb = aio_op_dispatch<State>;
+ return unique_completion_ptr{Rados::aio_create_completion(op, nullptr, cb)};
+}
+
+/// Allocate op state using the CompletionHandler's associated allocator.
+template <typename Result, typename Executor1, typename CompletionHandler,
+ typename State = op_state<CompletionHandler, Result, Executor1>,
+ typename StatePtr = typename State::ptr>
+StatePtr make_op_state(Executor1&& ex1, CompletionHandler& handler)
+{
+ // allocate a block of memory with StatePtr::allocate()
+ StatePtr p = {std::addressof(handler), StatePtr::allocate(handler), 0};
+ // create an AioCompletion to call aio_op_dispatch() with this pointer
+ auto completion = make_completion<State>(p.v);
+ // construct the op_state in place
+ p.p = new (p.v) State(handler, ex1, std::move(completion));
+ return p;
+}
+
+} // namespace detail
+
+
+/// Calls IoCtx::aio_read() and arranges for the AioCompletion to call a
+/// given handler with signature (boost::system::error_code, bufferlist).
+template <typename ExecutionContext, typename CompletionToken,
+ typename Signature = void(boost::system::error_code, bufferlist)>
+BOOST_ASIO_INITFN_RESULT_TYPE(CompletionToken, Signature)
+async_read(ExecutionContext& ctx, IoCtx& io, const std::string& oid,
+ size_t len, uint64_t off, CompletionToken&& token)
+{
+ boost::asio::async_completion<CompletionToken, Signature> init(token);
+ auto p = detail::make_op_state<bufferlist>(ctx.get_executor(),
+ init.completion_handler);
+
+ int ret = io.aio_read(oid, p.p->completion.get(),
+ &p.p->f.result, len, off);
+ if (ret < 0) {
+ // post the completion after releasing the handler-allocated memory
+ p.p->f.ec.assign(-ret, boost::system::system_category());
+ boost::asio::post(detail::release_handler(std::move(p)));
+ } else {
+ p.v = p.p = nullptr; // release ownership until completion
+ }
+ return init.result.get();
+}
+
+/// Calls IoCtx::aio_write() and arranges for the AioCompletion to call a
+/// given handler with signature (boost::system::error_code).
+template <typename ExecutionContext, typename CompletionToken,
+ typename Signature = void(boost::system::error_code)>
+BOOST_ASIO_INITFN_RESULT_TYPE(CompletionToken, Signature)
+async_write(ExecutionContext& ctx, IoCtx& io, const std::string& oid,
+ bufferlist &bl, size_t len, uint64_t off,
+ CompletionToken&& token)
+{
+ boost::asio::async_completion<CompletionToken, Signature> init(token);
+ auto p = detail::make_op_state<void>(ctx.get_executor(),
+ init.completion_handler);
+
+ int ret = io.aio_write(oid, p.p->completion.get(), bl, len, off);
+ if (ret < 0) {
+ p.p->f.ec.assign(-ret, boost::system::system_category());
+ boost::asio::post(detail::release_handler(std::move(p)));
+ } else {
+ p.v = p.p = nullptr; // release ownership until completion
+ }
+ return init.result.get();
+}
+
+/// Calls IoCtx::aio_operate() and arranges for the AioCompletion to call a
+/// given handler with signature (boost::system::error_code, bufferlist).
+template <typename ExecutionContext, typename CompletionToken,
+ typename Signature = void(boost::system::error_code, bufferlist)>
+BOOST_ASIO_INITFN_RESULT_TYPE(CompletionToken, Signature)
+async_operate(ExecutionContext& ctx, IoCtx& io, const std::string& oid,
+ ObjectReadOperation *op, int flags,
+ CompletionToken&& token)
+{
+ boost::asio::async_completion<CompletionToken, Signature> init(token);
+ auto p = detail::make_op_state<bufferlist>(ctx.get_executor(),
+ init.completion_handler);
+
+ int ret = io.aio_operate(oid, p.p->completion.get(), op,
+ flags, &p.p->f.result);
+ if (ret < 0) {
+ p.p->f.ec.assign(-ret, boost::system::system_category());
+ boost::asio::post(detail::release_handler(std::move(p)));
+ } else {
+ p.v = p.p = nullptr; // release ownership until completion
+ }
+ return init.result.get();
+}
+
+/// Calls IoCtx::aio_operate() and arranges for the AioCompletion to call a
+/// given handler with signature (boost::system::error_code).
+template <typename ExecutionContext, typename CompletionToken,
+ typename Signature = void(boost::system::error_code)>
+BOOST_ASIO_INITFN_RESULT_TYPE(CompletionToken, Signature)
+async_operate(ExecutionContext& ctx, IoCtx& io, const std::string& oid,
+ ObjectWriteOperation *op, int flags,
+ CompletionToken &&token)
+{
+ boost::asio::async_completion<CompletionToken, Signature> init(token);
+ auto p = detail::make_op_state<void>(ctx.get_executor(),
+ init.completion_handler);
+
+ int ret = io.aio_operate(oid, p.p->completion.get(), op, flags);
+ if (ret < 0) {
+ p.p->f.ec.assign(-ret, boost::system::system_category());
+ boost::asio::post(detail::release_handler(std::move(p)));
+ } else {
+ p.v = p.p = nullptr; // release ownership until completion
+ }
+ return init.result.get();
+}
+
+} // namespace librados
+
+#endif // LIBRADOS_ASIO_H
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2017 Red Hat, Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ */
+
+#include "librados/librados_asio.h"
+#include <gtest/gtest.h>
+
+#include "common/ceph_argparse.h"
+#include "common/debug.h"
+#include "common/errno.h"
+#include "global/global_init.h"
+
+#ifdef HAVE_BOOST_CONTEXT
+#define BOOST_COROUTINES_NO_DEPRECATION_WARNING
+#include <boost/asio/spawn.hpp>
+#endif
+#include <boost/asio/use_future.hpp>
+
+#define dout_subsys ceph_subsys_rados
+#define dout_context g_ceph_context
+
+// test fixture for global setup/teardown
+class AsioRados : public ::testing::Test {
+ static constexpr auto poolname = "ceph_test_rados_api_asio";
+
+ protected:
+ static librados::Rados rados;
+ static librados::IoCtx io;
+ // writes to snapio fail immediately with -EROFS. this is used to test errors
+ // that come from inside the initiating function, rather than passed to the
+ // AioCompletion callback
+ static librados::IoCtx snapio;
+
+ public:
+ static void SetUpTestCase() {
+ ASSERT_EQ(0, rados.init_with_context(g_ceph_context));
+ ASSERT_EQ(0, rados.connect());
+ // open/create test pool
+ int r = rados.ioctx_create(poolname, io);
+ if (r == -ENOENT) {
+ r = rados.pool_create(poolname);
+ if (r == -EEXIST) {
+ r = 0;
+ } else if (r == 0) {
+ r = rados.ioctx_create(poolname, io);
+ }
+ }
+ ASSERT_EQ(0, r);
+ ASSERT_EQ(0, rados.ioctx_create(poolname, snapio));
+ snapio.snap_set_read(1);
+ // initialize the "exist" object
+ bufferlist bl;
+ bl.append("hello");
+ ASSERT_EQ(0, io.write_full("exist", bl));
+ }
+
+ static void TearDownTestCase() {
+ rados.shutdown();
+ }
+};
+librados::Rados AsioRados::rados;
+librados::IoCtx AsioRados::io;
+librados::IoCtx AsioRados::snapio;
+
+TEST_F(AsioRados, AsyncReadCallback)
+{
+ boost::asio::io_service service;
+
+ auto success_cb = [&] (boost::system::error_code ec, bufferlist bl) {
+ EXPECT_FALSE(ec);
+ EXPECT_EQ("hello", bl.to_str());
+ };
+ librados::async_read(service, io, "exist", 256, 0, success_cb);
+
+ auto failure_cb = [&] (boost::system::error_code ec, bufferlist bl) {
+ EXPECT_EQ(boost::system::errc::no_such_file_or_directory, ec);
+ };
+ librados::async_read(service, io, "noexist", 256, 0, failure_cb);
+
+ service.run();
+}
+
+TEST_F(AsioRados, AsyncReadFuture)
+{
+ boost::asio::io_service service;
+
+ std::future<bufferlist> f1 = librados::async_read(service, io, "exist", 256,
+ 0, boost::asio::use_future);
+ std::future<bufferlist> f2 = librados::async_read(service, io, "noexist", 256,
+ 0, boost::asio::use_future);
+
+ service.run();
+
+ EXPECT_NO_THROW({
+ auto bl = f1.get();
+ EXPECT_EQ("hello", bl.to_str());
+ });
+ EXPECT_THROW(f2.get(), boost::system::system_error);
+}
+
+#ifdef HAVE_BOOST_CONTEXT
+TEST_F(AsioRados, AsyncReadYield)
+{
+ boost::asio::io_service service;
+
+ auto success_cr = [&] (boost::asio::yield_context yield) {
+ boost::system::error_code ec;
+ auto bl = librados::async_read(service, io, "exist", 256, 0, yield[ec]);
+ EXPECT_FALSE(ec);
+ EXPECT_EQ("hello", bl.to_str());
+ };
+ boost::asio::spawn(service, success_cr);
+
+ auto failure_cr = [&] (boost::asio::yield_context yield) {
+ boost::system::error_code ec;
+ auto bl = librados::async_read(service, io, "noexist", 256, 0, yield[ec]);
+ EXPECT_EQ(boost::system::errc::no_such_file_or_directory, ec);
+ };
+ boost::asio::spawn(service, failure_cr);
+
+ service.run();
+}
+#endif
+
+TEST_F(AsioRados, AsyncWriteCallback)
+{
+ boost::asio::io_service service;
+
+ bufferlist bl;
+ bl.append("hello");
+
+ auto success_cb = [&] (boost::system::error_code ec) {
+ EXPECT_FALSE(ec);
+ };
+ librados::async_write(service, io, "exist", bl, bl.length(), 0,
+ success_cb);
+
+ auto failure_cb = [&] (boost::system::error_code ec) {
+ EXPECT_EQ(boost::system::errc::read_only_file_system, ec);
+ };
+ librados::async_write(service, snapio, "exist", bl, bl.length(), 0,
+ failure_cb);
+
+ service.run();
+}
+
+TEST_F(AsioRados, AsyncWriteFuture)
+{
+ boost::asio::io_service service;
+
+ bufferlist bl;
+ bl.append("hello");
+
+ auto f1 = librados::async_write(service, io, "exist", bl, bl.length(), 0,
+ boost::asio::use_future);
+ auto f2 = librados::async_write(service, snapio, "exist", bl, bl.length(), 0,
+ boost::asio::use_future);
+
+ service.run();
+
+ EXPECT_NO_THROW(f1.get());
+ EXPECT_THROW(f2.get(), boost::system::system_error);
+}
+
+#ifdef HAVE_BOOST_CONTEXT
+TEST_F(AsioRados, AsyncWriteYield)
+{
+ boost::asio::io_service service;
+
+ bufferlist bl;
+ bl.append("hello");
+
+ auto success_cr = [&] (boost::asio::yield_context yield) {
+ boost::system::error_code ec;
+ librados::async_write(service, io, "exist", bl, bl.length(), 0,
+ yield[ec]);
+ EXPECT_FALSE(ec);
+ EXPECT_EQ("hello", bl.to_str());
+ };
+ boost::asio::spawn(service, success_cr);
+
+ auto failure_cr = [&] (boost::asio::yield_context yield) {
+ boost::system::error_code ec;
+ librados::async_write(service, snapio, "exist", bl, bl.length(), 0,
+ yield[ec]);
+ EXPECT_EQ(boost::system::errc::read_only_file_system, ec);
+ };
+ boost::asio::spawn(service, failure_cr);
+
+ service.run();
+}
+#endif
+
+TEST_F(AsioRados, AsyncReadOperationCallback)
+{
+ boost::asio::io_service service;
+ {
+ librados::ObjectReadOperation op;
+ op.read(0, 0, nullptr, nullptr);
+ auto success_cb = [&] (boost::system::error_code ec, bufferlist bl) {
+ EXPECT_FALSE(ec);
+ EXPECT_EQ("hello", bl.to_str());
+ };
+ librados::async_operate(service, io, "exist", &op, 0, success_cb);
+ }
+ {
+ librados::ObjectReadOperation op;
+ op.read(0, 0, nullptr, nullptr);
+ auto failure_cb = [&] (boost::system::error_code ec, bufferlist bl) {
+ EXPECT_EQ(boost::system::errc::no_such_file_or_directory, ec);
+ };
+ librados::async_operate(service, io, "noexist", &op, 0, failure_cb);
+ }
+ service.run();
+}
+
+TEST_F(AsioRados, AsyncReadOperationFuture)
+{
+ boost::asio::io_service service;
+ std::future<bufferlist> f1;
+ {
+ librados::ObjectReadOperation op;
+ op.read(0, 0, nullptr, nullptr);
+ f1 = librados::async_operate(service, io, "exist", &op, 0,
+ boost::asio::use_future);
+ }
+ std::future<bufferlist> f2;
+ {
+ librados::ObjectReadOperation op;
+ op.read(0, 0, nullptr, nullptr);
+ f2 = librados::async_operate(service, io, "noexist", &op, 0,
+ boost::asio::use_future);
+ }
+ service.run();
+
+ EXPECT_NO_THROW({
+ auto bl = f1.get();
+ EXPECT_EQ("hello", bl.to_str());
+ });
+ EXPECT_THROW(f2.get(), boost::system::system_error);
+}
+
+#ifdef HAVE_BOOST_CONTEXT
+TEST_F(AsioRados, AsyncReadOperationYield)
+{
+ boost::asio::io_service service;
+
+ auto success_cr = [&] (boost::asio::yield_context yield) {
+ librados::ObjectReadOperation op;
+ op.read(0, 0, nullptr, nullptr);
+ boost::system::error_code ec;
+ auto bl = librados::async_operate(service, io, "exist", &op, 0,
+ yield[ec]);
+ EXPECT_FALSE(ec);
+ EXPECT_EQ("hello", bl.to_str());
+ };
+ boost::asio::spawn(service, success_cr);
+
+ auto failure_cr = [&] (boost::asio::yield_context yield) {
+ librados::ObjectReadOperation op;
+ op.read(0, 0, nullptr, nullptr);
+ boost::system::error_code ec;
+ auto bl = librados::async_operate(service, io, "noexist", &op, 0,
+ yield[ec]);
+ EXPECT_EQ(boost::system::errc::no_such_file_or_directory, ec);
+ };
+ boost::asio::spawn(service, failure_cr);
+
+ service.run();
+}
+#endif
+
+TEST_F(AsioRados, AsyncWriteOperationCallback)
+{
+ boost::asio::io_service service;
+
+ bufferlist bl;
+ bl.append("hello");
+
+ {
+ librados::ObjectWriteOperation op;
+ op.write_full(bl);
+ auto success_cb = [&] (boost::system::error_code ec) {
+ EXPECT_FALSE(ec);
+ };
+ librados::async_operate(service, io, "exist", &op, 0, success_cb);
+ }
+ {
+ librados::ObjectWriteOperation op;
+ op.write_full(bl);
+ auto failure_cb = [&] (boost::system::error_code ec) {
+ EXPECT_EQ(boost::system::errc::read_only_file_system, ec);
+ };
+ librados::async_operate(service, snapio, "exist", &op, 0, failure_cb);
+ }
+ service.run();
+}
+
+TEST_F(AsioRados, AsyncWriteOperationFuture)
+{
+ boost::asio::io_service service;
+
+ bufferlist bl;
+ bl.append("hello");
+
+ std::future<void> f1;
+ {
+ librados::ObjectWriteOperation op;
+ op.write_full(bl);
+ f1 = librados::async_operate(service, io, "exist", &op, 0,
+ boost::asio::use_future);
+ }
+ std::future<void> f2;
+ {
+ librados::ObjectWriteOperation op;
+ op.write_full(bl);
+ f2 = librados::async_operate(service, snapio, "exist", &op, 0,
+ boost::asio::use_future);
+ }
+ service.run();
+
+ EXPECT_NO_THROW(f1.get());
+ EXPECT_THROW(f2.get(), boost::system::system_error);
+}
+
+#ifdef HAVE_BOOST_CONTEXT
+TEST_F(AsioRados, AsyncWriteOperationYield)
+{
+ boost::asio::io_service service;
+
+ bufferlist bl;
+ bl.append("hello");
+
+ auto success_cr = [&] (boost::asio::yield_context yield) {
+ librados::ObjectWriteOperation op;
+ op.write_full(bl);
+ boost::system::error_code ec;
+ librados::async_operate(service, io, "exist", &op, 0, yield[ec]);
+ EXPECT_FALSE(ec);
+ };
+ boost::asio::spawn(service, success_cr);
+
+ auto failure_cr = [&] (boost::asio::yield_context yield) {
+ librados::ObjectWriteOperation op;
+ op.write_full(bl);
+ boost::system::error_code ec;
+ librados::async_operate(service, snapio, "exist", &op, 0, yield[ec]);
+ EXPECT_EQ(boost::system::errc::read_only_file_system, ec);
+ };
+ boost::asio::spawn(service, failure_cr);
+
+ service.run();
+}
+#endif
+
+int main(int argc, char **argv)
+{
+ vector<const char*> args;
+ argv_to_vec(argc, (const char **)argv, args);
+ env_to_vec(args);
+
+ auto cct = global_init(NULL, args, CEPH_ENTITY_TYPE_CLIENT,
+ CODE_ENVIRONMENT_UTILITY, 0);
+ common_init_finish(cct.get());
+
+ ::testing::InitGoogleTest(&argc, argv);
+ return RUN_ALL_TESTS();
+}