From: Casey Bodley Date: Mon, 13 Feb 2017 19:55:06 +0000 (-0500) Subject: librados: add async interfaces for use with Networking TS X-Git-Tag: v13.0.2~595^2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=refs%2Fpull%2F19054%2Fhead;p=ceph.git librados: add async interfaces for use with Networking TS Defines asynchronous librados operations that satisfy all of the "Requirements on asynchronous operations" imposed by the C++ Networking TS [1] in section 13.2.7. These operations are implemented in terms of boost::asio, but the interfaces themselves are free of boost types - this makes the transition to std::net trivial when it's available. These interfaces conform to the Extensible Asynchronous Model [2] that originated in boost::asio. This model allows the last 'handler' argument to either be a callback that gets the result, a coroutine yield_context that will suspend until completion, or a 'use_future' tag to request the result in a std::future (see the unit tests for examples of each). The 'Extensible' part also enables further integration with new frameworks. For now, only async_read(), async_write(), and the read/write variants of async_operate() are provided. [1] Working Draft, C++ Extensions for Networking http://www.open-std.org/jtc1/sc22/wg21/docs/papers/2017/n4711.pdf [2] "Library Foundations for Asynchronous Operations" http://www.open-std.org/jtc1/sc22/wg21/docs/papers/2014/n3896.pdf Signed-off-by: Casey Bodley --- diff --git a/qa/workunits/rados/test.sh b/qa/workunits/rados/test.sh index 871c2310db83..cf6c46c8d56c 100755 --- a/qa/workunits/rados/test.sh +++ b/qa/workunits/rados/test.sh @@ -15,7 +15,7 @@ trap cleanup EXIT ERR HUP INT QUIT declare -A pids for f in \ - api_aio api_io api_list api_lock api_misc \ + api_aio api_io api_asio api_list api_lock api_misc \ api_tier api_pool api_snapshots api_stat api_watch_notify api_cmd \ api_service \ api_c_write_operations \ diff --git a/src/librados/librados_asio.h b/src/librados/librados_asio.h new file mode 100644 index 000000000000..8aeb096966a1 --- /dev/null +++ b/src/librados/librados_asio.h @@ -0,0 +1,293 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2017 Red Hat, Inc. + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + */ + +#ifndef LIBRADOS_ASIO_H +#define LIBRADOS_ASIO_H + +#include +#include +#include "include/rados/librados.hpp" + +/// Defines asynchronous librados operations that satisfy all of the +/// "Requirements on asynchronous operations" imposed by the C++ Networking TS +/// in section 13.2.7. Many of the type and variable names below are taken +/// directly from those requirements. +/// +/// The current draft of the Networking TS (as of 2017-11-27) is available here: +/// http://www.open-std.org/jtc1/sc22/wg21/docs/papers/2017/n4711.pdf +/// +/// The boost::asio documentation duplicates these requirements here: +/// http://www.boost.org/doc/libs/1_66_0/doc/html/boost_asio/reference/asynchronous_operations.html + +namespace librados { + +namespace detail { + +/// unique_ptr with custom deleter for AioCompletion +struct release_completion { + void operator()(AioCompletion *c) { c->release(); } +}; +using unique_completion_ptr = + std::unique_ptr; + +/// Invokes the given completion handler. When the type of Result is not void, +/// storage is provided for it and that result is passed as an additional +/// argument to the handler. +template +struct invoker { + Result result; + template + void invoke(CompletionHandler& completion_handler, + boost::system::error_code ec) { + completion_handler(ec, std::move(result)); + } +}; +// specialization for Result=void +template <> +struct invoker { + template + void invoke(CompletionHandler& completion_handler, + boost::system::error_code ec) { + completion_handler(ec); + } +}; + +/// The function object that eventually gets dispatched back to its associated +/// executor to invoke the completion_handler with our bound error_code and +/// Result arguments. +/// Inherits from invoker for empty base optimization when Result=void. +template +struct bound_completion_handler : public invoker { + CompletionHandler completion_handler; //< upcall handler from CompletionToken + boost::system::error_code ec; + Executor2 ex2; //< associated completion handler executor + + bound_completion_handler(CompletionHandler& completion_handler, Executor2 ex2) + : completion_handler(completion_handler), ex2(ex2) + { + // static check for CompletionHandler concept (must be CopyConstructible and + // callable with no arguments) + using namespace boost::asio; + BOOST_ASIO_COMPLETION_HANDLER_CHECK(bound_completion_handler, *this) type_check; + } + + /// Invoke the completion handler with our bound arguments + void operator()() { + this->invoke(completion_handler, ec); + } + + /// Delegate to CompletionHandler's associated allocator + using allocator_type = boost::asio::associated_allocator_t; + allocator_type get_allocator() const noexcept { + return boost::asio::get_associated_allocator(completion_handler); + } + + /// Use our associated completion handler executor + using executor_type = Executor2; + executor_type get_executor() const noexcept { + return ex2; + } +}; + +/// Operation state needed to invoke the handler on completion. This state must +/// be allocated so that its address can be passed through the AioCompletion +/// callback. This memory is managed by the CompletionHandler's associated +/// allocator according to "Allocation of intermediate storage" requirements. +template +struct op_state { + /// completion handler executor, which delegates to CompletionHandler's + /// associated executor or defaults to the io executor + using Executor2 = boost::asio::associated_executor_t; + + /// maintain outstanding work on the io executor + boost::asio::executor_work_guard work1; + /// maintain outstanding work on the completion handler executor + boost::asio::executor_work_guard work2; + + /// the function object that invokes the completion handler + bound_completion_handler f; + unique_completion_ptr completion; //< the AioCompletion + + op_state(CompletionHandler& completion_handler, Executor1 ex1, + unique_completion_ptr&& completion) + : work1(ex1), + work2(boost::asio::get_associated_executor(completion_handler, ex1)), + f(completion_handler, work2.get_executor()), + completion(std::move(completion)) + {} + + using Handler = CompletionHandler; // the following macro wants a Handler type + + /// Defines a scoped 'op_state::ptr' type that uses CompletionHandler's + /// associated allocator to manage its memory + BOOST_ASIO_DEFINE_HANDLER_PTR(op_state); +}; + +/// Handler allocators require that their memory is released before the handler +/// itself is invoked. Return a moved copy of the bound completion handler after +/// destroying/deallocating the op_state. +template +auto release_handler(StatePtr&& p) -> decltype(p.p->f) +{ + // move the completion handler out of the memory being released + auto f = std::move(p.p->f); + // return the memory to the moved handler's associated allocator + p.h = std::addressof(f.completion_handler); + p.reset(); + return f; +} + +/// AioCompletion callback function, executed in the librados finisher thread. +template +inline void aio_op_dispatch(completion_t cb, void *arg) +{ + auto op = static_cast(arg); + const int ret = op->completion->get_return_value(); + // maintain work until the completion handler is dispatched. these would + // otherwise be destroyed with op_state in release_handler() + auto work1 = std::move(op->work1); + auto work2 = std::move(op->work2); + // return the memory to the handler allocator + auto f = release_handler({nullptr, op, op}); + if (ret < 0) { + // assign the bound error code + f.ec.assign(-ret, boost::system::system_category()); + } + // dispatch the completion handler using its associated allocator/executor + auto alloc2 = boost::asio::get_associated_allocator(f); + f.ex2.dispatch(std::move(f), alloc2); +} + +/// Create an AioCompletion and return it as a unique_ptr. +template +inline unique_completion_ptr make_completion(void *op) +{ + auto cb = aio_op_dispatch; + return unique_completion_ptr{Rados::aio_create_completion(op, nullptr, cb)}; +} + +/// Allocate op state using the CompletionHandler's associated allocator. +template , + typename StatePtr = typename State::ptr> +StatePtr make_op_state(Executor1&& ex1, CompletionHandler& handler) +{ + // allocate a block of memory with StatePtr::allocate() + StatePtr p = {std::addressof(handler), StatePtr::allocate(handler), 0}; + // create an AioCompletion to call aio_op_dispatch() with this pointer + auto completion = make_completion(p.v); + // construct the op_state in place + p.p = new (p.v) State(handler, ex1, std::move(completion)); + return p; +} + +} // namespace detail + + +/// Calls IoCtx::aio_read() and arranges for the AioCompletion to call a +/// given handler with signature (boost::system::error_code, bufferlist). +template +BOOST_ASIO_INITFN_RESULT_TYPE(CompletionToken, Signature) +async_read(ExecutionContext& ctx, IoCtx& io, const std::string& oid, + size_t len, uint64_t off, CompletionToken&& token) +{ + boost::asio::async_completion init(token); + auto p = detail::make_op_state(ctx.get_executor(), + init.completion_handler); + + int ret = io.aio_read(oid, p.p->completion.get(), + &p.p->f.result, len, off); + if (ret < 0) { + // post the completion after releasing the handler-allocated memory + p.p->f.ec.assign(-ret, boost::system::system_category()); + boost::asio::post(detail::release_handler(std::move(p))); + } else { + p.v = p.p = nullptr; // release ownership until completion + } + return init.result.get(); +} + +/// Calls IoCtx::aio_write() and arranges for the AioCompletion to call a +/// given handler with signature (boost::system::error_code). +template +BOOST_ASIO_INITFN_RESULT_TYPE(CompletionToken, Signature) +async_write(ExecutionContext& ctx, IoCtx& io, const std::string& oid, + bufferlist &bl, size_t len, uint64_t off, + CompletionToken&& token) +{ + boost::asio::async_completion init(token); + auto p = detail::make_op_state(ctx.get_executor(), + init.completion_handler); + + int ret = io.aio_write(oid, p.p->completion.get(), bl, len, off); + if (ret < 0) { + p.p->f.ec.assign(-ret, boost::system::system_category()); + boost::asio::post(detail::release_handler(std::move(p))); + } else { + p.v = p.p = nullptr; // release ownership until completion + } + return init.result.get(); +} + +/// Calls IoCtx::aio_operate() and arranges for the AioCompletion to call a +/// given handler with signature (boost::system::error_code, bufferlist). +template +BOOST_ASIO_INITFN_RESULT_TYPE(CompletionToken, Signature) +async_operate(ExecutionContext& ctx, IoCtx& io, const std::string& oid, + ObjectReadOperation *op, int flags, + CompletionToken&& token) +{ + boost::asio::async_completion init(token); + auto p = detail::make_op_state(ctx.get_executor(), + init.completion_handler); + + int ret = io.aio_operate(oid, p.p->completion.get(), op, + flags, &p.p->f.result); + if (ret < 0) { + p.p->f.ec.assign(-ret, boost::system::system_category()); + boost::asio::post(detail::release_handler(std::move(p))); + } else { + p.v = p.p = nullptr; // release ownership until completion + } + return init.result.get(); +} + +/// Calls IoCtx::aio_operate() and arranges for the AioCompletion to call a +/// given handler with signature (boost::system::error_code). +template +BOOST_ASIO_INITFN_RESULT_TYPE(CompletionToken, Signature) +async_operate(ExecutionContext& ctx, IoCtx& io, const std::string& oid, + ObjectWriteOperation *op, int flags, + CompletionToken &&token) +{ + boost::asio::async_completion init(token); + auto p = detail::make_op_state(ctx.get_executor(), + init.completion_handler); + + int ret = io.aio_operate(oid, p.p->completion.get(), op, flags); + if (ret < 0) { + p.p->f.ec.assign(-ret, boost::system::system_category()); + boost::asio::post(detail::release_handler(std::move(p))); + } else { + p.v = p.p = nullptr; // release ownership until completion + } + return init.result.get(); +} + +} // namespace librados + +#endif // LIBRADOS_ASIO_H diff --git a/src/test/librados/CMakeLists.txt b/src/test/librados/CMakeLists.txt index 313eeea50ed7..1c909ee1ce0f 100644 --- a/src/test/librados/CMakeLists.txt +++ b/src/test/librados/CMakeLists.txt @@ -54,6 +54,16 @@ set_target_properties(ceph_test_rados_api_aio PROPERTIES COMPILE_FLAGS target_link_libraries(ceph_test_rados_api_aio librados ${UNITTEST_LIBS} radostest) +# ceph_test_rados_api_asio +add_executable(ceph_test_rados_api_asio asio.cc) +set_target_properties(ceph_test_rados_api_asio PROPERTIES COMPILE_FLAGS + ${UNITTEST_CXX_FLAGS}) +target_link_libraries(ceph_test_rados_api_asio global + librados ${UNITTEST_LIBS}) +if(WITH_BOOST_CONTEXT) + target_link_libraries(ceph_test_rados_api_asio Boost::coroutine Boost::context) +endif() + # ceph_test_rados_api_list add_executable(ceph_test_rados_api_list list.cc @@ -150,6 +160,7 @@ target_link_libraries(ceph_test_rados_api_snapshots install(TARGETS ceph_test_rados_api_aio + ceph_test_rados_api_asio ceph_test_rados_api_c_read_operations ceph_test_rados_api_c_write_operations ceph_test_rados_api_cmd diff --git a/src/test/librados/asio.cc b/src/test/librados/asio.cc new file mode 100644 index 000000000000..9c86ad420c84 --- /dev/null +++ b/src/test/librados/asio.cc @@ -0,0 +1,377 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2017 Red Hat, Inc. + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + */ + +#include "librados/librados_asio.h" +#include + +#include "common/ceph_argparse.h" +#include "common/debug.h" +#include "common/errno.h" +#include "global/global_init.h" + +#ifdef HAVE_BOOST_CONTEXT +#define BOOST_COROUTINES_NO_DEPRECATION_WARNING +#include +#endif +#include + +#define dout_subsys ceph_subsys_rados +#define dout_context g_ceph_context + +// test fixture for global setup/teardown +class AsioRados : public ::testing::Test { + static constexpr auto poolname = "ceph_test_rados_api_asio"; + + protected: + static librados::Rados rados; + static librados::IoCtx io; + // writes to snapio fail immediately with -EROFS. this is used to test errors + // that come from inside the initiating function, rather than passed to the + // AioCompletion callback + static librados::IoCtx snapio; + + public: + static void SetUpTestCase() { + ASSERT_EQ(0, rados.init_with_context(g_ceph_context)); + ASSERT_EQ(0, rados.connect()); + // open/create test pool + int r = rados.ioctx_create(poolname, io); + if (r == -ENOENT) { + r = rados.pool_create(poolname); + if (r == -EEXIST) { + r = 0; + } else if (r == 0) { + r = rados.ioctx_create(poolname, io); + } + } + ASSERT_EQ(0, r); + ASSERT_EQ(0, rados.ioctx_create(poolname, snapio)); + snapio.snap_set_read(1); + // initialize the "exist" object + bufferlist bl; + bl.append("hello"); + ASSERT_EQ(0, io.write_full("exist", bl)); + } + + static void TearDownTestCase() { + rados.shutdown(); + } +}; +librados::Rados AsioRados::rados; +librados::IoCtx AsioRados::io; +librados::IoCtx AsioRados::snapio; + +TEST_F(AsioRados, AsyncReadCallback) +{ + boost::asio::io_service service; + + auto success_cb = [&] (boost::system::error_code ec, bufferlist bl) { + EXPECT_FALSE(ec); + EXPECT_EQ("hello", bl.to_str()); + }; + librados::async_read(service, io, "exist", 256, 0, success_cb); + + auto failure_cb = [&] (boost::system::error_code ec, bufferlist bl) { + EXPECT_EQ(boost::system::errc::no_such_file_or_directory, ec); + }; + librados::async_read(service, io, "noexist", 256, 0, failure_cb); + + service.run(); +} + +TEST_F(AsioRados, AsyncReadFuture) +{ + boost::asio::io_service service; + + std::future f1 = librados::async_read(service, io, "exist", 256, + 0, boost::asio::use_future); + std::future f2 = librados::async_read(service, io, "noexist", 256, + 0, boost::asio::use_future); + + service.run(); + + EXPECT_NO_THROW({ + auto bl = f1.get(); + EXPECT_EQ("hello", bl.to_str()); + }); + EXPECT_THROW(f2.get(), boost::system::system_error); +} + +#ifdef HAVE_BOOST_CONTEXT +TEST_F(AsioRados, AsyncReadYield) +{ + boost::asio::io_service service; + + auto success_cr = [&] (boost::asio::yield_context yield) { + boost::system::error_code ec; + auto bl = librados::async_read(service, io, "exist", 256, 0, yield[ec]); + EXPECT_FALSE(ec); + EXPECT_EQ("hello", bl.to_str()); + }; + boost::asio::spawn(service, success_cr); + + auto failure_cr = [&] (boost::asio::yield_context yield) { + boost::system::error_code ec; + auto bl = librados::async_read(service, io, "noexist", 256, 0, yield[ec]); + EXPECT_EQ(boost::system::errc::no_such_file_or_directory, ec); + }; + boost::asio::spawn(service, failure_cr); + + service.run(); +} +#endif + +TEST_F(AsioRados, AsyncWriteCallback) +{ + boost::asio::io_service service; + + bufferlist bl; + bl.append("hello"); + + auto success_cb = [&] (boost::system::error_code ec) { + EXPECT_FALSE(ec); + }; + librados::async_write(service, io, "exist", bl, bl.length(), 0, + success_cb); + + auto failure_cb = [&] (boost::system::error_code ec) { + EXPECT_EQ(boost::system::errc::read_only_file_system, ec); + }; + librados::async_write(service, snapio, "exist", bl, bl.length(), 0, + failure_cb); + + service.run(); +} + +TEST_F(AsioRados, AsyncWriteFuture) +{ + boost::asio::io_service service; + + bufferlist bl; + bl.append("hello"); + + auto f1 = librados::async_write(service, io, "exist", bl, bl.length(), 0, + boost::asio::use_future); + auto f2 = librados::async_write(service, snapio, "exist", bl, bl.length(), 0, + boost::asio::use_future); + + service.run(); + + EXPECT_NO_THROW(f1.get()); + EXPECT_THROW(f2.get(), boost::system::system_error); +} + +#ifdef HAVE_BOOST_CONTEXT +TEST_F(AsioRados, AsyncWriteYield) +{ + boost::asio::io_service service; + + bufferlist bl; + bl.append("hello"); + + auto success_cr = [&] (boost::asio::yield_context yield) { + boost::system::error_code ec; + librados::async_write(service, io, "exist", bl, bl.length(), 0, + yield[ec]); + EXPECT_FALSE(ec); + EXPECT_EQ("hello", bl.to_str()); + }; + boost::asio::spawn(service, success_cr); + + auto failure_cr = [&] (boost::asio::yield_context yield) { + boost::system::error_code ec; + librados::async_write(service, snapio, "exist", bl, bl.length(), 0, + yield[ec]); + EXPECT_EQ(boost::system::errc::read_only_file_system, ec); + }; + boost::asio::spawn(service, failure_cr); + + service.run(); +} +#endif + +TEST_F(AsioRados, AsyncReadOperationCallback) +{ + boost::asio::io_service service; + { + librados::ObjectReadOperation op; + op.read(0, 0, nullptr, nullptr); + auto success_cb = [&] (boost::system::error_code ec, bufferlist bl) { + EXPECT_FALSE(ec); + EXPECT_EQ("hello", bl.to_str()); + }; + librados::async_operate(service, io, "exist", &op, 0, success_cb); + } + { + librados::ObjectReadOperation op; + op.read(0, 0, nullptr, nullptr); + auto failure_cb = [&] (boost::system::error_code ec, bufferlist bl) { + EXPECT_EQ(boost::system::errc::no_such_file_or_directory, ec); + }; + librados::async_operate(service, io, "noexist", &op, 0, failure_cb); + } + service.run(); +} + +TEST_F(AsioRados, AsyncReadOperationFuture) +{ + boost::asio::io_service service; + std::future f1; + { + librados::ObjectReadOperation op; + op.read(0, 0, nullptr, nullptr); + f1 = librados::async_operate(service, io, "exist", &op, 0, + boost::asio::use_future); + } + std::future f2; + { + librados::ObjectReadOperation op; + op.read(0, 0, nullptr, nullptr); + f2 = librados::async_operate(service, io, "noexist", &op, 0, + boost::asio::use_future); + } + service.run(); + + EXPECT_NO_THROW({ + auto bl = f1.get(); + EXPECT_EQ("hello", bl.to_str()); + }); + EXPECT_THROW(f2.get(), boost::system::system_error); +} + +#ifdef HAVE_BOOST_CONTEXT +TEST_F(AsioRados, AsyncReadOperationYield) +{ + boost::asio::io_service service; + + auto success_cr = [&] (boost::asio::yield_context yield) { + librados::ObjectReadOperation op; + op.read(0, 0, nullptr, nullptr); + boost::system::error_code ec; + auto bl = librados::async_operate(service, io, "exist", &op, 0, + yield[ec]); + EXPECT_FALSE(ec); + EXPECT_EQ("hello", bl.to_str()); + }; + boost::asio::spawn(service, success_cr); + + auto failure_cr = [&] (boost::asio::yield_context yield) { + librados::ObjectReadOperation op; + op.read(0, 0, nullptr, nullptr); + boost::system::error_code ec; + auto bl = librados::async_operate(service, io, "noexist", &op, 0, + yield[ec]); + EXPECT_EQ(boost::system::errc::no_such_file_or_directory, ec); + }; + boost::asio::spawn(service, failure_cr); + + service.run(); +} +#endif + +TEST_F(AsioRados, AsyncWriteOperationCallback) +{ + boost::asio::io_service service; + + bufferlist bl; + bl.append("hello"); + + { + librados::ObjectWriteOperation op; + op.write_full(bl); + auto success_cb = [&] (boost::system::error_code ec) { + EXPECT_FALSE(ec); + }; + librados::async_operate(service, io, "exist", &op, 0, success_cb); + } + { + librados::ObjectWriteOperation op; + op.write_full(bl); + auto failure_cb = [&] (boost::system::error_code ec) { + EXPECT_EQ(boost::system::errc::read_only_file_system, ec); + }; + librados::async_operate(service, snapio, "exist", &op, 0, failure_cb); + } + service.run(); +} + +TEST_F(AsioRados, AsyncWriteOperationFuture) +{ + boost::asio::io_service service; + + bufferlist bl; + bl.append("hello"); + + std::future f1; + { + librados::ObjectWriteOperation op; + op.write_full(bl); + f1 = librados::async_operate(service, io, "exist", &op, 0, + boost::asio::use_future); + } + std::future f2; + { + librados::ObjectWriteOperation op; + op.write_full(bl); + f2 = librados::async_operate(service, snapio, "exist", &op, 0, + boost::asio::use_future); + } + service.run(); + + EXPECT_NO_THROW(f1.get()); + EXPECT_THROW(f2.get(), boost::system::system_error); +} + +#ifdef HAVE_BOOST_CONTEXT +TEST_F(AsioRados, AsyncWriteOperationYield) +{ + boost::asio::io_service service; + + bufferlist bl; + bl.append("hello"); + + auto success_cr = [&] (boost::asio::yield_context yield) { + librados::ObjectWriteOperation op; + op.write_full(bl); + boost::system::error_code ec; + librados::async_operate(service, io, "exist", &op, 0, yield[ec]); + EXPECT_FALSE(ec); + }; + boost::asio::spawn(service, success_cr); + + auto failure_cr = [&] (boost::asio::yield_context yield) { + librados::ObjectWriteOperation op; + op.write_full(bl); + boost::system::error_code ec; + librados::async_operate(service, snapio, "exist", &op, 0, yield[ec]); + EXPECT_EQ(boost::system::errc::read_only_file_system, ec); + }; + boost::asio::spawn(service, failure_cr); + + service.run(); +} +#endif + +int main(int argc, char **argv) +{ + vector args; + argv_to_vec(argc, (const char **)argv, args); + env_to_vec(args); + + auto cct = global_init(NULL, args, CEPH_ENTITY_TYPE_CLIENT, + CODE_ENVIRONMENT_UTILITY, 0); + common_init_finish(cct.get()); + + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +}