]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/common: add coroutine integration for crimson futures 57344/head
authorSamuel Just <sjust@redhat.com>
Wed, 31 Jan 2024 23:34:34 +0000 (15:34 -0800)
committerMatan Breizman <mbreizma@redhat.com>
Wed, 8 May 2024 07:05:37 +0000 (10:05 +0300)
Adds coroutine machinery for crimson errorated and interruptible futures.

Signed-off-by: Samuel Just <sjust@redhat.com>
(cherry picked from commit 1126ec3cf6285abbf6a875ab27daff397f09220b)

src/crimson/common/coroutine.h [new file with mode: 0644]
src/test/crimson/CMakeLists.txt
src/test/crimson/gtest_seastar.h
src/test/crimson/test_crimson_coroutine.cc [new file with mode: 0644]

diff --git a/src/crimson/common/coroutine.h b/src/crimson/common/coroutine.h
new file mode 100644 (file)
index 0000000..c4dfca5
--- /dev/null
@@ -0,0 +1,310 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
+// vim: ts=8 sw=2 smarttab expandtab
+
+#pragma once
+
+#include <seastar/core/coroutine.hh>
+
+#include "crimson/common/errorator.h"
+#include "crimson/common/interruptible_future.h"
+
+
+namespace crimson {
+namespace internal {
+
+template <typename Interruptor, typename Errorator>
+struct to_future {
+  template <typename T>
+  using future = crimson::interruptible::interruptible_future_detail<
+    typename Interruptor::condition,
+    typename Errorator::template future<T>>;
+};
+
+template <typename Errorator>
+struct to_future<void, Errorator> {
+  template <typename T>
+  using future = typename Errorator::template future<T>;
+};
+
+
+template <typename Interruptor>
+struct to_future<Interruptor, void> {
+  template <typename T>
+  using future = ::crimson::interruptible::interruptible_future<
+    typename Interruptor::condition, T>;
+};
+
+template <>
+struct to_future<void, void> {
+  template <typename T>
+  using future = seastar::future<T>;
+};
+
+
+template <typename Future>
+struct cond_checker {
+  using ref = std::unique_ptr<cond_checker>;
+  virtual std::optional<Future> may_interrupt() = 0;
+  virtual ~cond_checker() = default;
+};
+
+template <typename Interruptor>
+struct interrupt_cond_capture {
+  using InterruptCond = typename Interruptor::condition;
+  interruptible::InterruptCondRef<InterruptCond> cond;
+
+  template <typename Future>
+  struct type_erased_cond_checker final : cond_checker<Future> {
+    interruptible::InterruptCondRef<InterruptCond> cond;
+
+    template <typename T>
+    type_erased_cond_checker(T &&t) : cond(std::forward<T>(t)) {}
+
+    std::optional<Future> may_interrupt() final {
+      return cond->template may_interrupt<Future>();
+    }
+  };
+
+  template <typename Future>
+  typename cond_checker<Future>::ref capture_and_get_checker() {
+    ceph_assert(interruptible::interrupt_cond<InterruptCond>.interrupt_cond);
+    cond = interruptible::interrupt_cond<InterruptCond>.interrupt_cond;
+    return typename cond_checker<Future>::ref{
+      new type_erased_cond_checker<Future>{cond}
+    };
+  }
+
+  void restore() {
+    ceph_assert(cond);
+    interruptible::interrupt_cond<InterruptCond>.set(cond);
+  }
+
+  void reset() {
+    interruptible::interrupt_cond<InterruptCond>.reset();
+  }
+};
+
+template <>
+struct interrupt_cond_capture<void> {
+  template <typename Future>
+  typename cond_checker<Future>::ref capture_and_get_checker() {
+    return nullptr;
+  }
+};
+
+template <typename Interruptor>
+struct seastar_task_ancestor : protected seastar::task {};
+
+template <>
+struct seastar_task_ancestor<void> : public seastar::task {};
+
+template <typename Interruptor, typename Errorator, typename T>
+class promise_base : public seastar_task_ancestor<Interruptor> {
+protected:
+  seastar::promise<T> _promise;
+
+public:
+  interrupt_cond_capture<Interruptor> cond;
+
+  using errorator_type = Errorator;
+  using interruptor = Interruptor;
+  static constexpr bool is_errorated = !std::is_void<Errorator>::value;
+  static constexpr bool is_interruptible = !std::is_void<Interruptor>::value;
+
+  using _to_future =  to_future<Interruptor, Errorator>;
+
+  template <typename U=void>
+  using future = typename _to_future::template future<U>;
+
+  promise_base() = default;
+  promise_base(promise_base&&) = delete;
+  promise_base(const promise_base&) = delete;
+
+  void set_exception(std::exception_ptr&& eptr) noexcept {
+    _promise.set_exception(std::move(eptr));
+  }
+
+  void unhandled_exception() noexcept {
+    _promise.set_exception(std::current_exception());
+  }
+
+  future<T> get_return_object() noexcept {
+    return _promise.get_future();
+  }
+
+  std::suspend_never initial_suspend() noexcept { return { }; }
+  std::suspend_never final_suspend() noexcept { return { }; }
+
+  void run_and_dispose() noexcept final {
+    if constexpr (is_interruptible) {
+      cond.restore();
+    }
+    auto handle = std::coroutine_handle<promise_base>::from_promise(*this);
+    handle.resume();
+    if constexpr (is_interruptible) {
+      cond.reset();
+    }
+  }
+
+  seastar::task *waiting_task() noexcept override {
+    return _promise.waiting_task();
+  }
+  seastar::task *get_seastar_task() { return this; }
+};
+
+template <typename Interruptor, typename Errorator, typename T=void>
+class coroutine_traits {
+public:
+  class promise_type final : public promise_base<Interruptor, Errorator, T> {
+    using base = promise_base<Interruptor, Errorator, T>;
+  public:
+    template <typename... U>
+    void return_value(U&&... value) {
+      base::_promise.set_value(std::forward<U>(value)...);
+    }
+  };
+};
+
+
+template <typename Interruptor, typename Errorator>
+class coroutine_traits<Interruptor, Errorator> {
+public:
+  class promise_type final : public promise_base<Interruptor, Errorator, void> {
+    using base = promise_base<Interruptor, Errorator, void>;
+  public:
+    void return_void() noexcept {
+      base::_promise.set_value();
+    }
+  };
+};
+
+template <typename Interruptor, typename Errorator,
+         bool CheckPreempt, typename T=void>
+struct awaiter {
+  static constexpr bool is_errorated = !std::is_void<Errorator>::value;
+  static constexpr bool is_interruptible = !std::is_void<Interruptor>::value;
+
+  template <typename U=void>
+  using future = typename to_future<Interruptor, Errorator>::template future<U>;
+
+  future<T> _future;
+
+  typename cond_checker<future<T>>::ref checker;
+public:
+  explicit awaiter(future<T>&& f) noexcept : _future(std::move(f)) { }
+
+  awaiter(const awaiter&) = delete;
+  awaiter(awaiter&&) = delete;
+
+  bool await_ready() const noexcept {
+    return _future.available() && (!CheckPreempt || !seastar::need_preempt());
+  }
+
+  template <typename U>
+  void await_suspend(std::coroutine_handle<U> hndl) noexcept {
+    if constexpr (is_errorated) {
+      using dest_errorator_t  = typename U::errorator_type;
+      static_assert(dest_errorator_t::template contains_once_v<Errorator>,
+                   "conversion is possible to more-or-eq errorated future!");
+    }
+
+    checker =
+      hndl.promise().cond.template capture_and_get_checker<future<T>>();
+    if (!CheckPreempt || !_future.available()) {
+      _future.set_coroutine(*hndl.promise().get_seastar_task());
+    } else {
+      ::seastar::schedule(hndl.promise().get_seastar_task());
+    }
+  }
+
+  T await_resume() {
+    if (auto maybe_fut = checker ? checker->may_interrupt() : std::nullopt) {
+      // silence warning that we are discarding an exceptional future
+      if (_future.failed()) _future.get_exception();
+      if constexpr (is_errorated) {
+       return maybe_fut->unsafe_get0();
+      } else {
+       return maybe_fut->get0();
+      }
+    } else {
+      if constexpr (is_errorated) {
+       return _future.unsafe_get0();
+      } else {
+       return _future.get0();
+      }
+    }
+  }
+};
+
+}
+}
+
+template <template <typename> typename Container, typename T>
+auto operator co_await(
+  Container<crimson::errorated_future_marker<T>> f) noexcept {
+  using Errorator = typename seastar::futurize<decltype(f)>::errorator_type;
+  return crimson::internal::awaiter<void, Errorator, true, T>(std::move(f));
+}
+
+template <typename InterruptCond, typename T>
+auto operator co_await(
+  crimson::interruptible::interruptible_future_detail<
+    InterruptCond, seastar::future<T>
+  > f) noexcept {
+  return crimson::internal::awaiter<
+    crimson::interruptible::interruptor<InterruptCond>, void, true, T>(
+  std::move(f));
+}
+
+template <template <typename> typename Container,
+         typename InterruptCond, typename T>
+auto operator co_await(
+  crimson::interruptible::interruptible_future_detail<
+    InterruptCond, Container<crimson::errorated_future_marker<T>>
+  > f) noexcept {
+  using Errorator = typename seastar::futurize<decltype(f)>::errorator_type;
+  return crimson::internal::awaiter<
+    crimson::interruptible::interruptor<InterruptCond>,
+    typename Errorator::base_ertr, true, T>(
+  std::move(f));
+}
+
+namespace std {
+
+template <template <typename> typename Container,
+         typename T, typename... Args>
+class coroutine_traits<Container<crimson::errorated_future_marker<T>>, Args...> :
+    public crimson::internal::coroutine_traits<
+      void,
+      typename seastar::futurize<
+       Container<crimson::errorated_future_marker<T>>
+       >::errorator_type,
+  T> {};
+
+template <typename InterruptCond,
+         typename T, typename... Args>
+class coroutine_traits<
+  crimson::interruptible::interruptible_future_detail<
+    InterruptCond, seastar::future<T>
+    >, Args...> : public crimson::internal::coroutine_traits<
+  crimson::interruptible::interruptor<InterruptCond>,
+  void,
+  T> {};
+
+template <template <typename> typename Container,
+         typename InterruptCond,
+         typename T, typename... Args>
+class coroutine_traits<
+  crimson::interruptible::interruptible_future_detail<
+    InterruptCond, Container<crimson::errorated_future_marker<T>>
+    >, Args...> :
+    public crimson::internal::coroutine_traits<
+      crimson::interruptible::interruptor<InterruptCond>,
+      typename seastar::futurize<
+        crimson::interruptible::interruptible_future_detail<
+         InterruptCond,
+          Container<crimson::errorated_future_marker<T>>
+         >
+      >::errorator_type::base_ertr,
+      T> {};
+}
index e1a5dfe73dfd3bafb8f4e4958e6c1a08598e6f1a..c8c5c84e65c682ef7f490ff1f3f5aba13d5f058b 100644 (file)
@@ -111,6 +111,14 @@ target_link_libraries(
 add_ceph_unittest(unittest-seastar-errorator
   --memory 256M --smp 1)
 
+add_executable(unittest-crimson-coroutine
+  test_crimson_coroutine.cc)
+target_link_libraries(
+  unittest-crimson-coroutine
+  crimson::gtest)
+add_ceph_unittest(unittest-crimson-coroutine
+  --memory 256M --smp 1)
+
 add_executable(unittest-crimson-scrub
   test_crimson_scrub.cc
   ${PROJECT_SOURCE_DIR}/src/crimson/osd/scrub/scrub_machine.cc
index 20709a3eec4870e5cb7372d8d1904d26d44ea729..34967c2531510e8f3100d861faa41422ae9d32df 100644 (file)
@@ -3,6 +3,7 @@
 
 #pragma once
 
+#include "crimson/common/errorator.h"
 #include "gtest/gtest.h"
 
 #include "seastar_runner.h"
@@ -15,6 +16,15 @@ struct seastar_test_suite_t : public ::testing::Test {
     return seastar_env.run(std::forward<Func>(func));
   }
 
+  template <typename Func>
+  void run_ertr(Func &&func) {
+    return run(
+      [func=std::forward<Func>(func)]() mutable {
+       return std::invoke(std::move(func)).handle_error(
+         crimson::ct_error::assert_all("error"));
+      });
+  }
+
   template <typename Func>
   void run_async(Func &&func) {
     run(
@@ -23,6 +33,23 @@ struct seastar_test_suite_t : public ::testing::Test {
       });
   }
 
+  template <typename F>
+  auto scl(F &&f) {
+    return [fptr = std::make_unique<F>(std::forward<F>(f))]() mutable {
+      return std::invoke(*fptr).finally([fptr=std::move(fptr)] {});
+    };
+  }
+
+  auto run_scl(auto &&f) {
+    return run([this, f=std::forward<decltype(f)>(f)]() mutable {
+      return std::invoke(scl(std::move(f)));
+    });
+  }
+
+  auto run_ertr_scl(auto &&f) {
+    return run_ertr(scl(std::forward<decltype(f)>(f)));
+  }
+
   virtual seastar::future<> set_up_fut() { return seastar::now(); }
   void SetUp() final {
     return run([this] { return set_up_fut(); });
diff --git a/src/test/crimson/test_crimson_coroutine.cc b/src/test/crimson/test_crimson_coroutine.cc
new file mode 100644 (file)
index 0000000..2b19ca0
--- /dev/null
@@ -0,0 +1,327 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
+// vim: ts=8 sw=2 smarttab
+
+#include <boost/iterator/counting_iterator.hpp>
+#include <numeric>
+
+#include "seastar/core/sleep.hh"
+
+#include "crimson/common/coroutine.h"
+#include "crimson/common/errorator.h"
+#include "crimson/common/interruptible_future.h"
+#include "crimson/common/log.h"
+
+#include "test/crimson/gtest_seastar.h"
+
+struct coroutine_test_t : public seastar_test_suite_t {
+  struct interruption_state_t {
+    bool interrupted = false;
+  } interruption_state;
+
+  class test_interruption : public std::exception
+  {};
+
+  class test_interrupt_cond {
+    interruption_state_t *int_state = nullptr;
+  public:
+    test_interrupt_cond() = delete;
+    test_interrupt_cond(interruption_state_t *int_state)
+      : int_state(int_state) {}
+
+    template <typename T>
+    std::optional<T> may_interrupt() {
+      ceph_assert(int_state);
+      if (int_state->interrupted) {
+       return seastar::futurize<T>::make_exception_future(
+         test_interruption()
+       );
+      } else {
+       return std::nullopt;
+      }
+    }
+
+    template <typename T>
+    static constexpr bool is_interruption_v = std::is_same_v<
+      T, test_interruption>;
+
+    static bool is_interruption(std::exception_ptr& eptr) {
+      if (*eptr.__cxa_exception_type() == typeid(test_interruption))
+       return true;
+      return false;
+    }
+  };
+  using interruptor = crimson::interruptible::interruptor<test_interrupt_cond>;
+
+  using ertr = crimson::errorator<crimson::ct_error::invarg>;
+  using iertr = crimson::interruptible::interruptible_errorator<
+    test_interrupt_cond,
+    ertr>;
+
+  using ertr2 = ertr::extend<
+    crimson::ct_error::eagain>;
+  using iertr2 = crimson::interruptible::interruptible_errorator<
+    test_interrupt_cond,
+    ertr2>;
+
+  using ertr3 = ertr::extend<
+    crimson::ct_error::enoent>;
+  using iertr3 = crimson::interruptible::interruptible_errorator<
+    test_interrupt_cond,
+    ertr3>;
+
+  void interrupt() {
+    interruption_state.interrupted = true;
+  }
+
+  seastar::future<> set_up_fut() final {
+    interruption_state.interrupted = false;
+    return seastar::now();
+  }
+
+
+  template <typename E, typename F>
+  auto cwi(E &&errf, F &&f) {
+    return interruptor::with_interruption(
+      scl(std::forward<F>(f)),
+      std::forward<E>(errf),
+      &interruption_state);
+  }
+};
+
+namespace crimson::interruptible {
+template
+thread_local interrupt_cond_t<coroutine_test_t::test_interrupt_cond>
+interrupt_cond<coroutine_test_t::test_interrupt_cond>;
+}
+
+TEST_F(coroutine_test_t, test_coroutine)
+{
+  run_scl([]() -> seastar::future<> {
+    constexpr int CHECK = 20;
+    auto unwrapped = co_await seastar::make_ready_future<int>(CHECK);
+    EXPECT_EQ(unwrapped, CHECK);
+  });
+}
+
+TEST_F(coroutine_test_t, test_ertr_coroutine_basic)
+{
+  run_ertr_scl([]() -> ertr::future<> {
+    constexpr int CHECK = 20;
+    auto unwrapped = co_await ertr::make_ready_future<int>(CHECK);
+    EXPECT_EQ(unwrapped, CHECK);
+  });
+}
+
+TEST_F(coroutine_test_t, test_ertr_coroutine_vanilla_future)
+{
+  run_ertr_scl([]() -> ertr::future<> {
+    constexpr int CHECK = 20;
+    auto unwrapped = co_await seastar::make_ready_future<int>(CHECK);
+    EXPECT_EQ(unwrapped, CHECK);
+  });
+}
+
+TEST_F(coroutine_test_t, test_ertr_coroutine_error)
+{
+  run_scl([this]() -> seastar::future<> {
+    auto fut = scl([]() -> ertr::future<int> {
+      std::ignore = co_await ertr::future<int>(
+       crimson::ct_error::invarg::make()
+      );
+      EXPECT_EQ("above co_await should throw", nullptr);
+      co_return 10;
+    })();
+    auto ret = co_await std::move(fut).handle_error(
+      [](const crimson::ct_error::invarg &e) {
+       return 20;
+      }
+    );
+    EXPECT_EQ(ret, 20);
+  });
+}
+
+#if 0
+// This one is left in, but commented out, as a test which *should fail to
+// build* due to trying to co_await a more errorated future.
+TEST_F(coroutine_test_t, test_basic_ertr_coroutine_error_should_not_build)
+{
+  run_ertr_scl([]() -> ertr::future<int> {
+    constexpr int CHECK = 20;
+    auto unwrapped = co_await ertr2::make_ready_future<int>(CHECK);
+    EXPECT_EQ(unwrapped, CHECK);
+    co_return 10;
+  });
+}
+#endif
+
+TEST_F(coroutine_test_t, interruptible_coroutine_basic)
+{
+  run_scl([this]() -> seastar::future<> {
+    seastar::promise<int> p;
+    auto ret = cwi(
+      [](auto) { return 2; },
+      [f=p.get_future()]() mutable -> interruptor::future<int> {
+       auto x = co_await interruptor::make_interruptible(std::move(f));
+       co_return x;
+      });
+    p.set_value(0);
+    auto awaited = co_await std::move(ret);
+    EXPECT_EQ(awaited, 0);
+  });
+}
+
+TEST_F(coroutine_test_t, interruptible_coroutine_interrupted)
+{
+  run_scl([this]() -> seastar::future<> {
+    seastar::promise<int> p;
+    auto ret = cwi(
+      [](auto) { return 2; },
+      [f=p.get_future()]() mutable -> interruptor::future<int> {
+       auto x = co_await interruptor::make_interruptible(std::move(f));
+       co_return x;
+      });
+    interrupt();
+    p.set_value(0);
+    auto awaited = co_await std::move(ret);
+    EXPECT_EQ(awaited, 2);
+  });
+}
+
+TEST_F(coroutine_test_t, dual_interruptible_coroutine)
+{
+  run_scl([this]() -> seastar::future<> {
+    seastar::promise<int> p, p2;
+    auto fut1 = cwi(
+      [](auto) { return 2; },
+      [&p, f=p2.get_future()]() mutable -> interruptor::future<int> {
+       auto x = co_await interruptor::make_interruptible(std::move(f));
+       p.set_value(1);
+       co_return x;
+      });
+    auto fut2 = cwi(
+      [](auto) { return 2; },
+      [&p2, f=p.get_future()]() mutable -> interruptor::future<int> {
+       p2.set_value(0);
+       auto x = co_await interruptor::make_interruptible(std::move(f));
+       co_return x;
+      });
+
+    auto ret1 = co_await std::move(fut1);
+    auto ret2 = co_await std::move(fut2);
+    EXPECT_EQ(ret1, 0);
+    EXPECT_EQ(ret2, 1);
+  });
+}
+
+TEST_F(coroutine_test_t, dual_interruptible_coroutine_interrupted)
+{
+  run_scl([this]() -> seastar::future<> {
+    seastar::promise<int> p, p2;
+    auto fut1 = cwi(
+      [](auto) { return 2; },
+      [this, &p, f=p2.get_future()]() mutable -> interruptor::future<int> {
+       auto x = co_await interruptor::make_interruptible(std::move(f));
+       interrupt();
+       p.set_value(1);
+       co_return x;
+      });
+    auto fut2 = cwi(
+      [](auto) { return 2; },
+      [&p2, f=p.get_future()]() mutable -> interruptor::future<int> {
+       p2.set_value(0);
+       auto x = co_await interruptor::make_interruptible(std::move(f));
+       co_return x;
+      });
+
+    auto ret1 = co_await std::move(fut1);
+    auto ret2 = co_await std::move(fut2);
+    EXPECT_EQ(ret1, 0);
+    EXPECT_EQ(ret2, 2);
+  });
+}
+
+TEST_F(coroutine_test_t, test_iertr_coroutine_basic)
+{
+  run_ertr_scl([this]() -> ertr2::future<> {
+    auto ret = co_await cwi(
+      [](auto) { return 10; },
+      []() -> iertr::future<int> {
+       co_return 20;
+      });
+    EXPECT_EQ(ret, 20);
+  });
+}
+
+TEST_F(coroutine_test_t, test_iertr_coroutine_interruption_as_error)
+{
+  run_ertr_scl([this]() -> ertr2::future<> {
+    auto ret = co_await cwi(
+      [](auto) {
+       return ertr2::future<int>(crimson::ct_error::eagain::make());
+      },
+      []() -> iertr::future<int> {
+       co_return 20;
+      });
+    EXPECT_EQ(ret, 20);
+  });
+}
+
+TEST_F(coroutine_test_t, test_iertr_coroutine_interruption_as_error_interrupted)
+{
+  run_ertr_scl([this]() -> ertr::future<> {
+    seastar::promise<> p;
+    auto f = cwi(
+      [](auto) {
+       return ertr2::future<int>(crimson::ct_error::eagain::make());
+      },
+      [&p]() -> iertr::future<int> {
+        co_await iertr::make_interruptible(p.get_future());
+       co_return 20;
+      });
+    interrupt();
+    p.set_value();
+    auto ret = co_await f.handle_error(
+      crimson::ct_error::eagain::handle([](const auto &) {
+       return 30;
+      }),
+      crimson::ct_error::pass_further_all{}
+    );
+    EXPECT_EQ(ret, 30);
+  });
+}
+
+#if 0
+// the cwi invocation below would yield an ertr2 due to the interruption handler
+TEST_F(coroutine_test_t, test_iertr_coroutine_interruption_should_not_compile)
+{
+  run_ertr_scl([this]() -> ertr::future<> {
+    auto ret = co_await cwi(
+      [](auto) {
+       ertr2::future<int>(crimson::ct_error::eagain::make());
+      },
+      []() -> iertr::future<int> {
+       co_return 20;
+      });
+    EXPECT_EQ(ret, 20);
+  });
+}
+#endif
+
+#if 0
+// can't co_await a vanilla future from an interruptible coroutine
+TEST_F(coroutine_test_t, test_iertr_coroutine_interruption_should_not_compile2)
+{
+  run_ertr_scl([this]() -> ertr2::future<> {
+    auto ret = co_await cwi(
+      [](auto) {
+       return ertr2::future<int>(crimson::ct_error::eagain::make());
+      },
+      []() -> iertr::future<int> {
+       co_await seastar::now();
+       co_return 20;
+      });
+    EXPECT_EQ(ret, 20);
+  });
+}
+#endif
+