]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
common/async: add co_waiter class template
authorCasey Bodley <cbodley@redhat.com>
Mon, 30 Jan 2023 01:30:30 +0000 (20:30 -0500)
committerAdam Emerson <aemerson@redhat.com>
Thu, 14 Sep 2023 21:48:00 +0000 (17:48 -0400)
Signed-off-by: Casey Bodley <cbodley@redhat.com>
src/common/async/co_waiter.h [new file with mode: 0644]

diff --git a/src/common/async/co_waiter.h b/src/common/async/co_waiter.h
new file mode 100644 (file)
index 0000000..cd29d78
--- /dev/null
@@ -0,0 +1,164 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#pragma once
+
+#include <exception>
+#include <optional>
+#include <boost/asio/append.hpp>
+#include <boost/asio/async_result.hpp>
+#include <boost/asio/dispatch.hpp>
+#include <boost/asio/execution/executor.hpp>
+#include <boost/asio/use_awaitable.hpp>
+#include "include/ceph_assert.h"
+
+namespace ceph::async {
+
+/// Captures an awaitable handler for deferred completion or cancellation.
+template <typename Ret, boost::asio::execution::executor Executor>
+class co_waiter {
+  using signature = void(std::exception_ptr, Ret);
+  using token_type = boost::asio::use_awaitable_t<Executor>;
+  using handler_type = typename boost::asio::async_result<
+      token_type, signature>::handler_type;
+  std::optional<handler_type> handler;
+
+  struct op_cancellation {
+    co_waiter* self;
+    void operator()(boost::asio::cancellation_type_t type) {
+      if (type != boost::asio::cancellation_type::none) {
+        self->cancel();
+      }
+    }
+  };
+ public:
+  co_waiter() = default;
+
+  // copy and move are disabled because the cancellation handler captures 'this'
+  co_waiter(const co_waiter&) = delete;
+  co_waiter& operator=(const co_waiter&) = delete;
+
+  /// Returns true if there's a handler awaiting completion.
+  bool waiting() const { return handler.has_value(); }
+
+  /// Returns an awaitable that blocks until complete() or cancel().
+  boost::asio::awaitable<Ret, Executor> get()
+  {
+    ceph_assert(!handler);
+    token_type token;
+    return boost::asio::async_initiate<token_type, signature>(
+        [this] (handler_type h) {
+          auto slot = boost::asio::get_associated_cancellation_slot(h);
+          if (slot.is_connected()) {
+            slot.template emplace<op_cancellation>(this);
+          }
+          handler.emplace(std::move(h));
+        }, token);
+  }
+
+  /// Schedule the completion handler with the given arguments.
+  void complete(std::exception_ptr eptr, Ret value)
+  {
+    ceph_assert(handler);
+    auto h = boost::asio::append(std::move(*handler), eptr, std::move(value));
+    handler.reset();
+    boost::asio::dispatch(std::move(h));
+  }
+
+  /// Cancel the coroutine with an operation_aborted exception.
+  void cancel()
+  {
+    if (handler) {
+      auto eptr = std::make_exception_ptr(
+          boost::system::system_error(
+              boost::asio::error::operation_aborted));
+      complete(eptr, Ret{});
+    }
+  }
+
+  /// Destroy the completion handler.
+  void shutdown()
+  {
+    handler.reset();
+  }
+};
+
+// specialization for Ret=void
+template <boost::asio::execution::executor Executor>
+class co_waiter<void, Executor> {
+  using signature = void(std::exception_ptr);
+  using token_type = boost::asio::use_awaitable_t<Executor>;
+  using handler_type = typename boost::asio::async_result<
+      token_type, signature>::handler_type;
+  std::optional<handler_type> handler;
+
+  struct op_cancellation {
+    co_waiter* self;
+    void operator()(boost::asio::cancellation_type_t type) {
+      if (type != boost::asio::cancellation_type::none) {
+        self->cancel();
+      }
+    }
+  };
+ public:
+  co_waiter() = default;
+
+  // copy and move are disabled because the cancellation handler captures 'this'
+  co_waiter(const co_waiter&) = delete;
+  co_waiter& operator=(const co_waiter&) = delete;
+
+  /// Returns true if there's a handler awaiting completion.
+  bool waiting() const { return handler.has_value(); }
+
+  /// Returns an awaitable that blocks until complete() or cancel().
+  boost::asio::awaitable<void, Executor> get()
+  {
+    ceph_assert(!handler);
+    token_type token;
+    return boost::asio::async_initiate<token_type, signature>(
+        [this] (handler_type h) {
+          auto slot = boost::asio::get_associated_cancellation_slot(h);
+          if (slot.is_connected()) {
+            slot.template emplace<op_cancellation>(this);
+          }
+          handler.emplace(std::move(h));
+        }, token);
+  }
+
+  /// Schedule the completion handler with the given arguments.
+  void complete(std::exception_ptr eptr)
+  {
+    ceph_assert(handler);
+    auto h = boost::asio::append(std::move(*handler), eptr);
+    handler.reset();
+    boost::asio::dispatch(std::move(h));
+  }
+
+  /// Cancel the coroutine with an operation_aborted exception.
+  void cancel()
+  {
+    if (handler) {
+      auto eptr = std::make_exception_ptr(
+          boost::system::system_error(
+              boost::asio::error::operation_aborted));
+      complete(eptr);
+    }
+  }
+
+  /// Destroy the completion handler.
+  void shutdown()
+  {
+    handler.reset();
+  }
+};
+
+} // namespace ceph::async