From: Adam C. Emerson Date: Wed, 31 May 2023 20:36:03 +0000 (-0400) Subject: common/async: Non-blocking condition variable X-Git-Tag: v20.3.0~169^2~37 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=fba9af39164d799fe19037744c81f3b1bb45b8ef;p=ceph.git common/async: Non-blocking condition variable Signed-off-by: Adam C. Emerson --- diff --git a/src/common/async/async_cond.h b/src/common/async/async_cond.h new file mode 100644 index 0000000000000..8f6ccd632a3a8 --- /dev/null +++ b/src/common/async/async_cond.h @@ -0,0 +1,175 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2023 IBM + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#pragma once + +/// \file common/async/async_cond.h + +#include +#include +#include +#include + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +#include "common/async/service.h" + +namespace ceph::async { +/// \brief A non-blocking condition variable +/// +/// This is effectively a condition variable, but rather than +/// blocking, the `async_wait` function takes an Asio completion token +/// and invokes the associated handler on wakeup. +/// +/// \tparam Executor An asio::executor +/// \tparam BasicLockable The mutex +template +class async_cond : public service_list_base_hook { + friend service; + + Executor executor; + service& svc; + + std::mutex m; + std::vector, std::unique_lock*>> handlers; + + void service_shutdown() { + std::unique_lock l(m); + handlers.clear(); + } + +public: + + /// \brief Constructor + /// + /// \param executor The executor on which to post handlers. + async_cond(Executor executor) + : executor(executor), + svc(boost::asio::use_service>( + boost::asio::query(executor, boost::asio::execution::context))) { + // register for service_shutdown() notifications + svc.add(*this); + } + + /// \brief Destructor + /// + /// Will call `cancel`, dispatching all handlers with + /// `asio::error::operation_aborted`. + ~async_cond() { + cancel(); + svc.remove(*this); + } + + async_cond(const async_cond&) = delete; + async_cond& operator =(const async_cond&) = delete; + async_cond(async_cond&&) = delete; + async_cond& operator =(async_cond&&) = delete; + + /// \brief Wait for notification + /// + /// This will dispatch the handler for the provided completion token + /// when `notify` is called. If `notify` has already been called, + /// dispatch immediately. + /// + /// \param token Boost.Asio completion token. + /// + /// \returns Whatever is appropriate to the completion token. See + /// Boost.Asio documentation. + template + CompletionToken> + auto async_wait(std::unique_lock& caller_lock, + CompletionToken&& token) { + namespace asio = boost::asio; + namespace sys = boost::system; + assert(caller_lock.owns_lock()); + auto consigned = asio::consign( + std::forward(token), asio::make_work_guard( + asio::get_associated_executor(token, get_executor()))); + return asio::async_initiate( + [this, &caller_lock](auto handler) { + std::unique_lock l(m); + handlers.emplace_back(std::move(handler), &caller_lock); + caller_lock.unlock(); + }, consigned); + } + + /// \brief Dispatch all handlers currently waiting + /// + /// Dispatches all handlers currently waiting. After this function + /// is called, any new calls to `wait` will return immediately. + void notify(std::unique_lock& caller_lock) { + namespace asio = boost::asio; + namespace sys = boost::system; + assert(caller_lock.owns_lock()); + std::unique_lock l(m); + if (!handlers.empty()) { + auto workhandlers = std::move(handlers); + handlers.resize(0); + l.unlock(); + for (auto&& [handler, lock] : workhandlers) { + asio::post(executor, + [handler = std::move(handler), lock = lock]() mutable { + lock->lock(); + std::move(handler)(sys::error_code{}); + }); + + } + } + } + + /// \brief Dispatch all handlers currently waiting with an error + /// + /// This wakes all handlers currently waiting and dispatches them with + /// `asio::error::operation_aborted`. + void cancel() { + namespace asio = boost::asio; + std::unique_lock l(m); + if (!handlers.empty()) { + auto workhandlers = std::move(handlers); + handlers.resize(0); + l.unlock(); + for (auto&& [handler, lock] : workhandlers) { + asio::post(executor, + [handler = std::move(handler), lock = lock]() mutable { + lock->lock(); + std::move(handler)(asio::error::operation_aborted); + }); + + } + } + } + + /// \brief Type of the executor we dispatch on + using executor_type = Executor; + + /// \brief Return the executor we dispatch on + auto get_executor() const { + return executor; + } +}; +} diff --git a/src/test/common/CMakeLists.txt b/src/test/common/CMakeLists.txt index 590f66f55c196..7182904ea9a29 100644 --- a/src/test/common/CMakeLists.txt +++ b/src/test/common/CMakeLists.txt @@ -488,7 +488,12 @@ target_link_libraries(unittest_async_call ceph-common Boost::system GTest::GTest) add_ceph_unittest(unittest_async_call) + add_executable(unittest_librados_completion test_librados_completion.cc) target_link_libraries(unittest_librados_completion librados ceph-common Boost::system GTest::GTest) add_ceph_unittest(unittest_librados_completion) + +add_executable(unittest_async_cond test_async_cond.cc) +target_link_libraries(unittest_async_cond GTest::GTest) +add_ceph_unittest(unittest_async_cond) diff --git a/src/test/common/test_async_cond.cc b/src/test/common/test_async_cond.cc new file mode 100644 index 0000000000000..9bd35c40c7ec6 --- /dev/null +++ b/src/test/common/test_async_cond.cc @@ -0,0 +1,86 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2023 IBM + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include "common/async/async_cond.h" + +#include + +#include + +#include + +namespace asio = boost::asio; +namespace sys = boost::system; + +namespace async = ceph::async; + +enum response : int { + error, success +}; + +std::mutex m; + +struct waiter { + std::unique_lock l{m}; + int* i; + + waiter(int* i) : i(i) {} + + void operator ()(sys::error_code ec) { + EXPECT_TRUE(l.owns_lock()); + *i = ec ? error : success; + l.unlock(); + delete this; + } +}; + + +TEST(async_cond, lambdata) +{ + asio::io_context io_context; + async::async_cond cond(io_context.get_executor()); + std::array data; + data.fill(0xdeadbeef); + + + for (auto i = 0; i < std::ssize(data); ++i) { + auto c = new waiter(data.data() + i); + cond.async_wait(c->l, std::ref(*c)); + } + std::unique_lock l(m); + cond.notify(l); + l.unlock(); + io_context.run(); + for (const auto& d : data) { + ASSERT_EQ(success, d); + } +} + +TEST(async_cond, lambdataReset) +{ + asio::io_context io_context; + async::async_cond cond(io_context.get_executor()); + std::array data; + data.fill(0xdeadbeef); + + for (auto i = 0; i < std::ssize(data); ++i) { + auto c = new waiter(data.data() + i); + cond.async_wait(c->l, std::ref(*c)); + } + cond.cancel(); + io_context.run(); + for (const auto& d : data) { + ASSERT_EQ(error, d); + } +}