]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
common/async: Non-blocking condition variable
authorAdam C. Emerson <aemerson@redhat.com>
Wed, 31 May 2023 20:36:03 +0000 (16:36 -0400)
committerAdam C. Emerson <aemerson@redhat.com>
Tue, 1 Apr 2025 15:10:13 +0000 (11:10 -0400)
Signed-off-by: Adam C. Emerson <aemerson@redhat.com>
src/common/async/async_cond.h [new file with mode: 0644]
src/test/common/CMakeLists.txt
src/test/common/test_async_cond.cc [new file with mode: 0644]

diff --git a/src/common/async/async_cond.h b/src/common/async/async_cond.h
new file mode 100644 (file)
index 0000000..8f6ccd6
--- /dev/null
@@ -0,0 +1,175 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2023 IBM
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+#pragma once
+
+/// \file common/async/async_cond.h
+
+#include <cassert>
+#include <mutex>
+#include <utility>
+#include <vector>
+
+#include <boost/asio/execution/context.hpp>
+
+#include <boost/asio/any_completion_handler.hpp>
+#include <boost/asio/append.hpp>
+#include <boost/asio/async_result.hpp>
+#include <boost/asio/consign.hpp>
+#include <boost/asio/error.hpp>
+#include <boost/asio/execution_context.hpp>
+#include <boost/asio/executor_work_guard.hpp>
+#include <boost/asio/query.hpp>
+#include <boost/asio/strand.hpp>
+
+#include <boost/system/error_code.hpp>
+
+#include "common/async/service.h"
+
+namespace ceph::async {
+/// \brief A non-blocking condition variable
+///
+/// This is effectively a condition variable, but rather than
+/// blocking, the `async_wait` function takes an Asio completion token
+/// and invokes the associated handler on wakeup.
+///
+/// \tparam Executor An asio::executor
+/// \tparam BasicLockable The mutex
+template<typename Executor, typename BasicLockable = std::mutex>
+class async_cond : public service_list_base_hook {
+  friend service<async_cond>;
+
+  Executor executor;
+  service<async_cond>& svc;
+
+  std::mutex m;
+  std::vector<std::pair<
+    boost::asio::any_completion_handler<
+    void(boost::system::error_code)>, std::unique_lock<BasicLockable>*>> handlers;
+
+  void service_shutdown() {
+    std::unique_lock l(m);
+    handlers.clear();
+  }
+
+public:
+
+  /// \brief Constructor
+  ///
+  /// \param executor The executor on which to post handlers.
+  async_cond(Executor executor)
+    : executor(executor),
+      svc(boost::asio::use_service<service<async_cond>>(
+           boost::asio::query(executor, boost::asio::execution::context))) {
+    // register for service_shutdown() notifications
+    svc.add(*this);
+  }
+
+  /// \brief Destructor
+  ///
+  /// Will call `cancel`, dispatching all handlers with
+  /// `asio::error::operation_aborted`.
+  ~async_cond() {
+    cancel();
+    svc.remove(*this);
+  }
+
+  async_cond(const async_cond&) = delete;
+  async_cond& operator =(const async_cond&) = delete;
+  async_cond(async_cond&&) = delete;
+  async_cond& operator =(async_cond&&) = delete;
+
+  /// \brief Wait for notification
+  ///
+  /// This will dispatch the handler for the provided completion token
+  /// when `notify` is called. If `notify` has already been called,
+  /// dispatch immediately.
+  ///
+  /// \param token Boost.Asio completion token.
+  ///
+  /// \returns Whatever is appropriate to the completion token. See
+  /// Boost.Asio documentation.
+  template<boost::asio::completion_token_for<void(boost::system::error_code)>
+          CompletionToken>
+  auto async_wait(std::unique_lock<BasicLockable>& caller_lock,
+                 CompletionToken&& token) {
+    namespace asio = boost::asio;
+    namespace sys = boost::system;
+    assert(caller_lock.owns_lock());
+    auto consigned = asio::consign(
+      std::forward<CompletionToken>(token), asio::make_work_guard(
+       asio::get_associated_executor(token, get_executor())));
+    return asio::async_initiate<decltype(consigned), void(sys::error_code)>(
+      [this, &caller_lock](auto handler) {
+       std::unique_lock l(m);
+       handlers.emplace_back(std::move(handler), &caller_lock);
+       caller_lock.unlock();
+      }, consigned);
+  }
+
+  /// \brief Dispatch all handlers currently waiting
+  ///
+  /// Dispatches all handlers currently waiting. After this function
+  /// is called, any new calls to `wait` will return immediately.
+  void notify(std::unique_lock<BasicLockable>& caller_lock) {
+    namespace asio = boost::asio;
+    namespace sys = boost::system;
+    assert(caller_lock.owns_lock());
+    std::unique_lock l(m);
+    if (!handlers.empty()) {
+      auto workhandlers = std::move(handlers);
+      handlers.resize(0);
+      l.unlock();
+      for (auto&& [handler, lock] : workhandlers) {
+       asio::post(executor,
+                  [handler = std::move(handler), lock = lock]() mutable {
+                    lock->lock();
+                    std::move(handler)(sys::error_code{});
+                  });
+
+      }
+    }
+  }
+
+  /// \brief Dispatch all handlers currently waiting with an error
+  ///
+  /// This wakes all handlers currently waiting and dispatches them with
+  /// `asio::error::operation_aborted`.
+  void cancel() {
+    namespace asio = boost::asio;
+    std::unique_lock l(m);
+    if (!handlers.empty()) {
+      auto workhandlers = std::move(handlers);
+      handlers.resize(0);
+      l.unlock();
+      for (auto&& [handler, lock] : workhandlers) {
+       asio::post(executor,
+                  [handler = std::move(handler), lock = lock]() mutable {
+                    lock->lock();
+                    std::move(handler)(asio::error::operation_aborted);
+                  });
+
+      }
+    }
+  }
+
+  /// \brief Type of the executor we dispatch on
+  using executor_type = Executor;
+
+  /// \brief Return the executor we dispatch on
+  auto get_executor() const {
+    return executor;
+  }
+};
+}
index 590f66f55c19630f30bd6d199f10aab113a5e444..7182904ea9a29df52d220616302f447bdb1a8948 100644 (file)
@@ -488,7 +488,12 @@ target_link_libraries(unittest_async_call ceph-common Boost::system
   GTest::GTest)
 add_ceph_unittest(unittest_async_call)
 
+
 add_executable(unittest_librados_completion test_librados_completion.cc)
 target_link_libraries(unittest_librados_completion librados ceph-common
   Boost::system GTest::GTest)
 add_ceph_unittest(unittest_librados_completion)
+
+add_executable(unittest_async_cond test_async_cond.cc)
+target_link_libraries(unittest_async_cond GTest::GTest)
+add_ceph_unittest(unittest_async_cond)
diff --git a/src/test/common/test_async_cond.cc b/src/test/common/test_async_cond.cc
new file mode 100644 (file)
index 0000000..9bd35c4
--- /dev/null
@@ -0,0 +1,86 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2023 IBM
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+#include "common/async/async_cond.h"
+
+#include <coroutine>
+
+#include <boost/asio/io_context.hpp>
+
+#include <gtest/gtest.h>
+
+namespace asio = boost::asio;
+namespace sys = boost::system;
+
+namespace async = ceph::async;
+
+enum response : int {
+  error, success
+};
+
+std::mutex m;
+
+struct waiter {
+  std::unique_lock<std::mutex> l{m};
+  int* i;
+
+  waiter(int* i) : i(i) {}
+
+  void operator ()(sys::error_code ec) {
+    EXPECT_TRUE(l.owns_lock());
+    *i = ec ? error : success;
+    l.unlock();
+    delete this;
+  }
+};
+
+
+TEST(async_cond, lambdata)
+{
+  asio::io_context io_context;
+  async::async_cond cond(io_context.get_executor());
+  std::array<int, 5> data;
+  data.fill(0xdeadbeef);
+
+
+  for (auto i = 0; i < std::ssize(data); ++i) {
+    auto c = new waiter(data.data() + i);
+    cond.async_wait(c->l, std::ref(*c));
+  }
+  std::unique_lock l(m);
+  cond.notify(l);
+  l.unlock();
+  io_context.run();
+  for (const auto& d : data) {
+    ASSERT_EQ(success, d);
+  }
+}
+
+TEST(async_cond, lambdataReset)
+{
+  asio::io_context io_context;
+  async::async_cond cond(io_context.get_executor());
+  std::array<int, 5> data;
+  data.fill(0xdeadbeef);
+
+  for (auto i = 0; i < std::ssize(data); ++i) {
+    auto c = new waiter(data.data() + i);
+    cond.async_wait(c->l, std::ref(*c));
+  }
+  cond.cancel();
+  io_context.run();
+  for (const auto& d : data) {
+    ASSERT_EQ(error, d);
+  }
+}