]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson: add interruptible future facilities
authorXuehan Xu <xxhdx1985126@gmail.com>
Sun, 4 Oct 2020 09:50:48 +0000 (17:50 +0800)
committerXuehan Xu <xxhdx1985126@gmail.com>
Sun, 7 Mar 2021 06:24:47 +0000 (14:24 +0800)
Signed-off-by: Xuehan Xu <xxhdx1985126@gmail.com>
Signed-off-by: Kefu Chai <kchai@redhat.com>
src/crimson/common/interruptible_future.h [new file with mode: 0644]
src/crimson/osd/CMakeLists.txt
src/crimson/osd/pg_interval_interrupt_condition.cc [new file with mode: 0644]
src/crimson/osd/pg_interval_interrupt_condition.h [new file with mode: 0644]

diff --git a/src/crimson/common/interruptible_future.h b/src/crimson/common/interruptible_future.h
new file mode 100644 (file)
index 0000000..ef60b44
--- /dev/null
@@ -0,0 +1,1272 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include <seastar/core/future-util.hh>
+#include <seastar/core/do_with.hh>
+#include <seastar/core/when_all.hh>
+
+#include "crimson/common/errorator.h"
+
+// The interrupt condition generally works this way:
+//
+//  1. It is created by "enable_interruption" method, and is recorded in the thread
+//     local global variable "::crimson::interruptible::interrupt_cond" (each "enable_
+//     interruption" is paired with a "disable_interruption" to ensure that the global
+//     interrupt_cond won't hold unexpected interrupt conditions during the execution
+//     of unrelated continuations);
+//  2. Any continuation that's created within the execution of the continuation
+//     that calls the "enable_interruption" method will capture the "interrupt_cond";
+//     and when they starts to run, they will put that capture interruption condition
+//     into "::crimson::interruptible::interrupt_cond" so that further continuations
+//     created can also capture the interruption condition;
+//  3. At the end of the continuation run, the global "interrupt_cond" will be cleared
+//     to prevent other continuations that are not supposed to be interrupted wrongly
+//     capture an interruption condition.
+// With this approach, continuations capture the interrupt condition at their creation,
+// restore the interrupt conditions at the beginning of their execution and clear those
+// interrupt conditions at the end of their execution. So the global "interrupt_cond"
+// only hold valid interrupt conditions when the corresponding continuations are actually
+// running after which it gets cleared. Since continuations can't be executed simultaneously,
+// different continuation chains won't be able to interfere with each other.
+//
+// The global "interrupt_cond" can work as a signal about whether the continuation
+// is supposed to be interrupted, the reason that the global "interrupt_cond" and
+// "enable_interruption" are added is that there may be this scenario:
+//
+//     Say there's some method PG::func1(), in which the continuations created may
+//     or may not be supposed to be interrupted in different situations. If we don't
+//     have a global signal, we have to add an extra parameter to every method like
+//     PG::func1() to indicate whether the current run should create to-be-interrupted
+//     continuations or not.
+//
+// For users to insert an interruption condition, interruptor::with_interruption() in which
+// enable_interruption and disable_interruption would be called should be invoked. And users
+// can only handle interruption in the interruptible_future_detail::handle_interruption method.
+
+namespace crimson::interruptible {
+
+struct ready_future_marker {};
+struct exception_future_marker {};
+
+template <typename InterruptCond>
+class interruptible_future_builder;
+
+template <typename InterruptCond>
+struct interruptor;
+
+template <typename InterruptCond>
+using InterruptCondRef = seastar::lw_shared_ptr<InterruptCond>;
+
+template <typename InterruptCond>
+thread_local InterruptCondRef<InterruptCond> interrupt_cond;
+
+template <typename InterruptCond, typename FutureType>
+class interruptible_future_detail {};
+
+template <typename FutureType>
+struct is_interruptible_future : public std::false_type {};
+
+template <typename InterruptCond, typename FutureType>
+struct is_interruptible_future<
+  interruptible_future_detail<
+    InterruptCond,
+    FutureType>>
+  : public std::true_type {};
+
+namespace internal {
+
+template <typename InterruptCond, typename Func, typename... Args>
+auto call_with_interruption_impl(
+  InterruptCondRef<InterruptCond> interrupt_condition,
+  Func&& func, Args&&... args)
+{
+  using futurator_t = seastar::futurize<std::invoke_result_t<Func, Args...>>;
+  // there might be a case like this:
+  //   with_interruption([] {
+  //           interruptor::do_for_each([] {
+  //                   ...
+  //                   return interruptible_errorated_future();
+  //           }).safe_then_interruptible([] {
+  //                   ...
+  //           });
+  //   })
+  // In this case, as crimson::do_for_each would directly do futurize_invoke
+  // for "call_with_interruption", we have make sure this invocation would
+  // errorly release ::crimson::interruptible::interrupt_cond<InterruptCond>
+  bool set_int_cond = false;
+
+  // If there exists an interrupt condition, which means "Func" may not be
+  // permitted to run as a result of the interruption, test it. If it does
+  // need to be interrupted, return an interruption; otherwise, restore the
+  // global "interrupt_cond" with the interruption condition, and go ahead
+  // executing the Func.
+  if (!interrupt_cond<InterruptCond> && interrupt_condition) {
+    auto [interrupt, fut] = interrupt_condition->template may_interrupt<
+      typename futurator_t::type>();
+    if (interrupt) {
+      return std::move(*fut);
+    }
+    set_int_cond = true;
+    interrupt_cond<InterruptCond> = interrupt_condition;
+  }
+
+  auto fut = seastar::futurize_invoke(
+      std::forward<Func>(func),
+      std::forward<Args>(args)...);
+  // Clear the global "interrupt_cond" to prevent it from interfering other
+  // continuation chains.
+  if (set_int_cond && interrupt_cond<InterruptCond>)
+    interrupt_cond<InterruptCond>.release();
+  return fut;
+}
+
+}
+
+template <typename InterruptCond, typename Func, typename Ret,
+         typename Result = std::invoke_result_t<Func, Ret>,
+         std::enable_if_t<!InterruptCond::template is_interruption_v<Ret> &&
+           seastar::is_future<Ret>::value, int> = 0>
+auto call_with_interruption(
+  InterruptCondRef<InterruptCond> interrupt_condition,
+  Func&& func, Ret&& fut)
+{
+  // if "T" is already an interrupt exception, return it directly;
+  // otherwise, upper layer application may encounter errors executing
+  // the "Func" body.
+  if (fut.failed()) {
+    std::exception_ptr eptr = fut.get_exception();
+    if (interrupt_condition->is_interruption(eptr)) {
+      return seastar::futurize<Result>::make_exception_future(std::move(eptr));
+    }
+    return internal::call_with_interruption_impl(
+             interrupt_condition,
+             std::forward<Func>(func),
+             seastar::futurize<Ret>::make_exception_future(
+               std::move(eptr)));
+  }
+  return internal::call_with_interruption_impl(
+           interrupt_condition,
+           std::forward<Func>(func),
+           std::move(fut));
+}
+
+template <typename InterruptCond, typename Func, typename T,
+         typename Result = std::invoke_result_t<Func, T>,
+         std::enable_if_t<InterruptCond::template is_interruption_v<T>, int> = 0>
+auto call_with_interruption(
+  InterruptCondRef<InterruptCond> interrupt_condition,
+  Func&& func, T&& arg)
+{
+  // if "T" is already an interrupt exception, return it directly;
+  // otherwise, upper layer application may encounter errors executing
+  // the "Func" body.
+  return seastar::futurize<Result>::make_exception_future(
+      std::get<0>(std::tuple(std::forward<T>(arg))));
+}
+
+template <typename InterruptCond, typename Func, typename T,
+         typename Result = std::invoke_result_t<Func, T>,
+         std::enable_if_t<
+           !InterruptCond::template is_interruption_v<T> &&
+             !seastar::is_future<T>::value, int> = 0>
+auto call_with_interruption(
+  InterruptCondRef<InterruptCond> interrupt_condition,
+  Func&& func, T&& arg)
+{
+  return internal::call_with_interruption_impl(
+           interrupt_condition,
+           std::forward<Func>(func),
+           std::forward<T>(arg));
+}
+
+template <typename InterruptCond, typename Func,
+         typename Result = std::invoke_result_t<Func>>
+auto call_with_interruption(
+  InterruptCondRef<InterruptCond> interrupt_condition,
+  Func&& func)
+{
+  return internal::call_with_interruption_impl(
+           interrupt_condition,
+           std::forward<Func>(func));
+}
+
+template <typename InterruptCond, typename Func, typename... T,
+         typename Result = std::invoke_result_t<Func, T...>>
+decltype(auto) non_futurized_call_with_interruption(
+  InterruptCondRef<InterruptCond> interrupt_condition,
+  Func&& func, T&&... args)
+{
+  bool set_int_cond = false;
+  if (!interrupt_cond<InterruptCond> && interrupt_condition) {
+    auto [interrupt, fut] = interrupt_condition->template may_interrupt<seastar::future<>>();
+    if (interrupt) {
+      std::rethrow_exception(fut->get_exception());
+    }
+    set_int_cond = true;
+    interrupt_cond<InterruptCond> = std::move(interrupt_condition);
+  }
+  try {
+    if constexpr (std::is_void_v<Result>) {
+      std::invoke(std::forward<Func>(func), std::forward<T>(args)...);
+
+      // Clear the global "interrupt_cond" to prevent it from interfering other
+      // continuation chains.
+      if (set_int_cond && interrupt_cond<InterruptCond>)
+       interrupt_cond<InterruptCond>.release();
+      return;
+    } else {
+      auto&& err = std::invoke(std::forward<Func>(func), std::forward<T>(args)...);
+      if (set_int_cond && interrupt_cond<InterruptCond>)
+       interrupt_cond<InterruptCond>.release();
+      return std::forward<Result>(err);
+    }
+  } catch (std::exception& e) {
+    // Clear the global "interrupt_cond" to prevent it from interfering other
+    // continuation chains.
+    if (set_int_cond && interrupt_cond<InterruptCond>)
+      interrupt_cond<InterruptCond>.release();
+    throw e;
+  }
+}
+
+template <typename InterruptCond, typename T>
+class interruptible_future_detail<InterruptCond, seastar::future<T>>
+  : private seastar::future<T> {
+public:
+  using core_type = seastar::future<T>;
+  template <typename U>
+  using interrupt_futurize_t =
+    typename interruptor<InterruptCond>::template futurize_t<U>;
+  using core_type::get0;
+  using core_type::core_type;
+
+  [[gnu::always_inline]]
+  interruptible_future_detail(seastar::future<T>&& base)
+    : core_type(std::move(base))
+  {}
+
+  using value_type = typename seastar::future<T>::value_type;
+  using tuple_type = typename seastar::future<T>::tuple_type;
+
+  [[gnu::always_inline]]
+  value_type&& get() {
+    return std::move(core_type::get());
+  }
+
+  using core_type::available;
+  using core_type::failed;
+
+  template <typename Func>
+  [[gnu::always_inline]]
+  auto handle_interruption(Func&& func) {
+    return core_type::then_wrapped(
+      [this, func=std::move(func),
+      interrupt_condition=interrupt_cond<InterruptCond>](auto&& fut) mutable {
+      if (fut.failed()) {
+       std::exception_ptr ex = fut.get_exception();
+       if (interrupt_condition->is_interruption(ex)) {
+         return seastar::futurize_invoke(std::move(func), std::move(ex));
+       } else {
+         return seastar::make_exception_future<T>(std::move(ex));
+       }
+      } else {
+       return seastar::make_ready_future<T>(fut.get());
+      }
+    });
+  }
+
+  template <typename Func,
+           typename Result = interrupt_futurize_t<
+               std::invoke_result_t<Func, seastar::future<T>>>>
+  [[gnu::always_inline]]
+  Result then_wrapped_interruptible(Func&& func) {
+    assert(interrupt_cond<InterruptCond>);
+    return core_type::then_wrapped(
+      [func=std::move(func), interrupt_condition=interrupt_cond<InterruptCond>]
+      (auto&& fut) mutable {
+      return call_with_interruption(
+               std::move(interrupt_condition),
+               std::forward<Func>(func),
+               std::move(fut));
+    });
+  }
+
+  template <typename Func>
+  [[gnu::always_inline]]
+  auto then_interruptible(Func&& func) {
+    assert(interrupt_cond<InterruptCond>);
+    if constexpr (std::is_void_v<T>) {
+      auto fut = core_type::then(
+       [func=std::move(func), interrupt_condition=interrupt_cond<InterruptCond>]
+       () mutable {
+       return call_with_interruption(
+                 interrupt_condition,
+                 std::move(func));
+      });
+      return (interrupt_futurize_t<decltype(fut)>)(std::move(fut));
+    } else {
+      auto fut = core_type::then(
+       [func=std::move(func), interrupt_condition=interrupt_cond<InterruptCond>]
+       (T&& arg) mutable {
+       return call_with_interruption(
+                 interrupt_condition,
+                 std::move(func),
+                 std::forward<T>(arg));
+      });
+      return (interrupt_futurize_t<decltype(fut)>)(std::move(fut));
+    }
+  }
+
+  template <typename Func,
+           typename Result =interrupt_futurize_t<
+               std::result_of_t<Func(std::exception_ptr)>>>
+  [[gnu::always_inline]]
+  Result handle_exception_interruptible(Func&& func) {
+    assert(interrupt_cond<InterruptCond>);
+    return core_type::then_wrapped(
+      [func=std::forward<Func>(func),
+      interrupt_condition=interrupt_cond<InterruptCond>](auto&& fut) mutable {
+      if (!fut.failed()) {
+       return seastar::make_ready_future<T>(fut.get());
+      } else {
+       return call_with_interruption(
+                 interrupt_condition,
+                 std::move(func),
+                 fut.get_exception());
+      }
+    });
+  }
+
+  template <bool may_interrupt = true, typename Func,
+           typename Result = interrupt_futurize_t<
+               std::result_of_t<Func()>>>
+  [[gnu::always_inline]]
+  Result finally_interruptible(Func&& func) {
+    if constexpr (may_interrupt) {
+      assert(interrupt_cond<InterruptCond>);
+      return core_type::then_wrapped(
+       [func=std::forward<Func>(func),
+       interrupt_condition=interrupt_cond<InterruptCond>](auto&& fut) mutable {
+         return call_with_interruption(
+                   interrupt_condition,
+                   std::move(func));
+      });
+    } else {
+      return core_type::finally(std::forward<Func>(func));
+    }
+  }
+
+  template <typename Func,
+           typename Result = interrupt_futurize_t<
+               std::result_of_t<Func(
+                 typename seastar::function_traits<Func>::template arg<0>::type)>>>
+  [[gnu::always_inline]]
+  Result handle_exception_type_interruptible(Func&& func) {
+    assert(interrupt_cond<InterruptCond>);
+    using trait = seastar::function_traits<Func>;
+    static_assert(trait::arity == 1, "func can take only one parameter");
+    using ex_type = typename trait::template arg<0>::type;
+    return core_type::then_wrapped(
+      [func=std::forward<Func>(func),
+      interrupt_condition=interrupt_cond<InterruptCond>](auto&& fut) mutable 
+      -> Result {
+      if (!fut.failed()) {
+       return seastar::make_ready_future<T>(fut.get());
+      } else {
+       try {
+         std::rethrow_exception(fut.get_exception());
+       } catch (ex_type& ex) {
+         return call_with_interruption(
+                   interrupt_condition,
+                   std::move(func), ex);
+       }
+      }
+    });
+  }
+
+private:
+  seastar::future<T> to_future() {
+    return static_cast<core_type&&>(std::move(*this));
+  }
+  // this is only supposed to be invoked by seastar functions
+  template <typename Func,
+           typename Result = interrupt_futurize_t<
+               std::result_of_t<Func(seastar::future<T>)>>>
+  [[gnu::always_inline]]
+  Result then_wrapped(Func&& func) {
+    return core_type::then_wrapped(
+      [func=std::move(func), interrupt_condition=interrupt_cond<InterruptCond>]
+      (auto&& fut) mutable {
+      return call_with_interruption(
+               interrupt_condition,
+               std::forward<Func>(func),
+               std::move(fut));
+    });
+  }
+  // this is only supposed to be invoked by seastar functions
+  template <typename Func,
+           typename Result = interrupt_futurize_t<
+               std::invoke_result_t<Func>>>
+  [[gnu::always_inline]]
+  Result finally(Func&& func) {
+    return core_type::finally(std::forward<Func>(func));
+  }
+  template <typename Iterator, typename AsyncAction, typename Result,
+           std::enable_if_t<
+             is_interruptible_future<Result>::value, int>>
+  friend auto interruptor<InterruptCond>::do_for_each(
+           Iterator, Iterator, AsyncAction&&);
+  template <typename Iterator, typename AsyncAction, typename Result,
+           std::enable_if_t<
+             !is_interruptible_future<Result>::value, int>>
+  friend auto interruptor<InterruptCond>::do_for_each(
+           Iterator, Iterator, AsyncAction&&);
+  template <typename AsyncAction, typename Result,
+           std::enable_if_t<
+             is_interruptible_future<Result>::value, int>>
+  friend auto interruptor<InterruptCond>::repeat(AsyncAction&&);
+  template <typename AsyncAction, typename Result,
+           std::enable_if_t<
+             !is_interruptible_future<Result>::value, int>>
+  friend auto interruptor<InterruptCond>::repeat(AsyncAction&&);
+  friend class interruptible_future_builder<InterruptCond>;
+  template <typename U>
+  friend struct ::seastar::futurize;
+  template <typename>
+  friend class ::seastar::future;
+  template <typename HeldState, typename Future>
+  friend class seastar::internal::do_with_state;
+  template<typename TX, typename F>
+  friend inline auto ::seastar::internal::do_with_impl(TX&& rvalue, F&& f);
+  template<typename T1, typename T2, typename T3_or_F, typename... More>
+  friend inline auto ::seastar::internal::do_with_impl(T1&& rv1, T2&& rv2, T3_or_F&& rv3, More&&... more);
+  template <typename T1, typename T2, typename... More>
+  friend auto seastar::internal::do_with_impl(T1&& rv1, T2&& rv2, More&&... more);
+  template <typename, typename>
+  friend class ::crimson::maybe_handle_error_t;
+  template <typename>
+  friend class ::seastar::internal::extract_values_from_futures_vector;
+  friend class interruptor<InterruptCond>::parallel_for_each_state;
+  template <typename, typename>
+  friend class interruptible_future_detail;
+  template <typename ResolvedVectorTransform, typename Future>
+  friend inline typename ResolvedVectorTransform::future_type
+  seastar::internal::complete_when_all(
+      std::vector<Future>&& futures,
+      typename std::vector<Future>::iterator pos) noexcept;
+  template <typename>
+  friend class ::seastar::internal::when_all_state_component;
+  template <typename Lock, typename Func>
+  friend inline auto seastar::with_lock(Lock& lock, Func&& f);
+};
+
+template <typename InterruptCond, typename Errorator>
+struct interruptible_errorator {
+  template <typename ValueT = void>
+  using future = interruptible_future_detail<InterruptCond,
+       typename Errorator::template future<ValueT>>;
+
+  template <typename ValueT = void, typename A>
+  static interruptible_future_detail<
+    InterruptCond,
+    typename Errorator::template future<ValueT>>
+  make_ready_future(A&& value) {
+    return interruptible_future_detail<
+      InterruptCond, typename Errorator::template future<ValueT>>(
+       Errorator::template make_ready_future<ValueT>(
+         std::forward<A>(value)));
+  }
+  template <typename ValueT = void>
+  static interruptible_future_detail<
+    InterruptCond,
+    typename Errorator::template future<ValueT>>
+  make_ready_future() {
+    return interruptible_future_detail<
+      InterruptCond, typename Errorator::template future<ValueT>>(
+       Errorator::template make_ready_future<ValueT>());
+  }
+  static interruptible_future_detail<
+    InterruptCond,
+    typename Errorator::template future<>> now() {
+    return interruptible_future_detail<
+      InterruptCond, typename Errorator::template future<>>(
+       Errorator::now());
+  }
+};
+
+template <typename InterruptCond,
+         template <typename...> typename ErroratedFuture,
+         typename T>
+class interruptible_future_detail<
+  InterruptCond,
+  ErroratedFuture<::crimson::errorated_future_marker<T>>>
+  : private ErroratedFuture<::crimson::errorated_future_marker<T>>
+{
+public:
+  using core_type = ErroratedFuture<::crimson::errorated_future_marker<T>>;
+  template <typename U>
+  using interrupt_futurize_t =
+    typename interruptor<InterruptCond>::template futurize_t<U>;
+
+  using core_type::available;
+  using core_type::failed;
+  using core_type::core_type;
+
+  interruptible_future_detail(seastar::future<T>&& fut)
+    : core_type(std::move(fut))
+  {}
+
+  template <template <typename...> typename ErroratedFuture2,
+           typename... U>
+  [[gnu::always_inline]]
+  interruptible_future_detail(
+    ErroratedFuture2<::crimson::errorated_future_marker<U...>>&& fut)
+    : core_type(std::move(fut)) {
+    using src_errorator_t = \
+      typename ErroratedFuture2<
+       ::crimson::errorated_future_marker<U...>>::errorator_type;
+    static_assert(core_type::errorator_type::template contains_once_v<
+                   src_errorator_t>,
+                 "conversion is only possible from less-or-eq errorated future!");
+  }
+
+  template <template <typename...> typename ErroratedFuture2,
+           typename... U>
+  [[gnu::always_inline]]
+  interruptible_future_detail(
+    interruptible_future_detail<InterruptCond,
+      ErroratedFuture2<::crimson::errorated_future_marker<U...>>>&& fut)
+    : core_type(static_cast<typename std::decay_t<decltype(fut)>::core_type&&>(fut)) {
+    using src_errorator_t = \
+      typename ErroratedFuture2<
+       ::crimson::errorated_future_marker<U...>>::errorator_type;
+    static_assert(core_type::errorator_type::template contains_once_v<
+                   src_errorator_t>,
+                 "conversion is only possible from less-or-eq errorated future!");
+  }
+
+  [[gnu::always_inline]]
+  interruptible_future_detail(
+    interruptible_future_detail<InterruptCond, seastar::future<T>>&& fut)
+    : core_type(static_cast<seastar::future<T>&&>(fut)) {}
+
+  template<bool interruptible = true, typename ValueInterruptCondT, typename ErrorVisitorT,
+          std::enable_if_t<!interruptible, int> = 0>
+  [[gnu::always_inline]]
+  auto safe_then_interruptible(ValueInterruptCondT&& valfunc, ErrorVisitorT&& errfunc) {
+    auto fut = core_type::safe_then(
+       std::forward<ValueInterruptCondT>(valfunc),
+       std::forward<ErrorVisitorT>(errfunc));
+    return (interrupt_futurize_t<decltype(fut)>)(std::move(fut));
+  }
+
+
+  template<bool interruptible = true, typename ValueInterruptCondT, typename ErrorVisitorT,
+          typename U = T, std::enable_if_t<!std::is_void_v<U> && interruptible, int> = 0>
+  [[gnu::always_inline]]
+  auto safe_then_interruptible(ValueInterruptCondT&& valfunc, ErrorVisitorT&& errfunc) {
+    assert(interrupt_cond<InterruptCond>);
+    auto fut = core_type::safe_then(
+      [func=std::move(valfunc), interrupt_condition=interrupt_cond<InterruptCond>]
+      (T&& args) mutable {
+      return call_with_interruption(
+               interrupt_condition,
+               std::move(func),
+               std::forward<T>(args));
+      }, [func=std::move(errfunc),
+         interrupt_condition=interrupt_cond<InterruptCond>]
+         (auto&& err) mutable -> decltype(auto) {
+         constexpr bool return_void = std::is_void_v<
+           std::invoke_result_t<ErrorVisitorT,
+             std::decay_t<decltype(err)>>>;
+         constexpr bool return_err = ::crimson::is_error_v<
+           std::decay_t<std::invoke_result_t<ErrorVisitorT,
+             std::decay_t<decltype(err)>>>>;
+         if constexpr (return_err || return_void) {
+           return non_futurized_call_with_interruption(
+                     interrupt_condition,
+                     std::move(func),
+                     std::move(err));
+         } else {
+           return call_with_interruption(
+                     interrupt_condition,
+                     std::move(func),
+                     std::move(err));
+         }
+      });
+    return (interrupt_futurize_t<decltype(fut)>)(std::move(fut));
+  }
+
+  template<bool interruptible = true, typename ValueInterruptCondT, typename ErrorVisitorT,
+          typename U = T, std::enable_if_t<std::is_void_v<U> && interruptible, int> = 0>
+  [[gnu::always_inline]]
+  auto safe_then_interruptible(ValueInterruptCondT&& valfunc, ErrorVisitorT&& errfunc) {
+    assert(interrupt_cond<InterruptCond>);
+    auto fut = core_type::safe_then(
+      [func=std::move(valfunc), interrupt_condition=interrupt_cond<InterruptCond>]
+      () mutable {
+      return call_with_interruption(
+               interrupt_condition,
+               std::move(func));
+      }, [func=std::move(errfunc),
+         interrupt_condition=interrupt_cond<InterruptCond>]
+         (auto&& err) mutable -> decltype(auto) {
+         constexpr bool return_void = std::is_void_v<
+           std::invoke_result_t<ErrorVisitorT,
+             std::decay_t<decltype(err)>>>;
+         constexpr bool return_err = ::crimson::is_error_v<
+           std::decay_t<std::invoke_result_t<ErrorVisitorT,
+             std::decay_t<decltype(err)>>>>;
+         if constexpr (return_err || return_void) {
+           return non_futurized_call_with_interruption(
+                     interrupt_condition,
+                     std::move(func),
+                     std::move(err));
+         } else {
+           return call_with_interruption(
+                     interrupt_condition,
+                     std::move(func),
+                     std::move(err));
+         }
+      });
+    return (interrupt_futurize_t<decltype(fut)>)(std::move(fut));
+  }
+
+  template <bool interruptible = true, typename ValueInterruptCondT,
+           typename U = T, std::enable_if_t<std::is_void_v<T> && interruptible, int> = 0>
+  [[gnu::always_inline]]
+  auto safe_then_interruptible(ValueInterruptCondT&& valfunc) {
+    assert(interrupt_cond<InterruptCond>);
+    auto fut = core_type::safe_then(
+      [func=std::move(valfunc),
+       interrupt_condition=interrupt_cond<InterruptCond>]
+      () mutable {
+      return call_with_interruption(
+               interrupt_condition,
+               std::move(func));
+    });
+    return (interrupt_futurize_t<decltype(fut)>)(std::move(fut));
+  }
+
+  template <bool interruptible = true, typename ValueInterruptCondT,
+           typename U = T, std::enable_if_t<!std::is_void_v<T> && interruptible, int> = 0>
+  [[gnu::always_inline]]
+  auto safe_then_interruptible(ValueInterruptCondT&& valfunc) {
+    assert(interrupt_cond<InterruptCond>);
+    auto fut = core_type::safe_then(
+      [func=std::move(valfunc),
+       interrupt_condition=interrupt_cond<InterruptCond>]
+      (T&& arg) mutable {
+      return call_with_interruption(
+               interrupt_condition,
+               std::move(func),
+               std::forward<T>(arg));
+    });
+    return (interrupt_futurize_t<decltype(fut)>)(std::move(fut));
+  }
+
+  template <bool interruptible = true, typename ValueInterruptCondT,
+           std::enable_if_t<!interruptible, int> = 0>
+  [[gnu::always_inline]]
+  auto safe_then_interruptible(ValueInterruptCondT&& valfunc) {
+    auto fut = core_type::safe_then(std::forward<ValueInterruptCondT>(valfunc));
+    return (interrupt_futurize_t<decltype(fut)>)(std::move(fut));
+  }
+
+  template <typename ValueInterruptCondT,
+           typename ErrorVisitorHeadT,
+           typename... ErrorVisitorTailT>
+  [[gnu::always_inline]]
+  auto safe_then_interruptible_tuple(ValueInterruptCondT&& valfunc,
+                              ErrorVisitorHeadT&& err_func_head,
+                              ErrorVisitorTailT&&... err_func_tail) {
+    return safe_then_interruptible(
+       std::forward<ValueInterruptCondT>(valfunc),
+       ::crimson::composer(std::forward<ErrorVisitorHeadT>(err_func_head),
+                           std::forward<ErrorVisitorTailT>(err_func_tail)...));
+  }
+
+  template <typename ErrorFunc>
+  auto handle_error_interruptible(ErrorFunc&& errfunc) {
+    assert(interrupt_cond<InterruptCond>);
+    auto fut = core_type::handle_error(
+      [errfunc=std::move(errfunc),
+       interrupt_condition=interrupt_cond<InterruptCond>]
+      (auto&& err) mutable -> decltype(auto) {
+       constexpr bool return_void = std::is_void_v<
+         std::invoke_result_t<ErrorFunc,
+           std::decay_t<decltype(err)>>>;
+       constexpr bool return_err = ::crimson::is_error_v<
+         std::decay_t<std::invoke_result_t<ErrorFunc,
+           std::decay_t<decltype(err)>>>>;
+       if constexpr (return_err || return_void) {
+         return non_futurized_call_with_interruption(
+                   interrupt_condition,
+                   std::move(errfunc),
+                   std::move(err));
+       } else {
+         return call_with_interruption(
+                   interrupt_condition,
+                   std::move(errfunc),
+                   std::move(err));
+       }
+      });
+    return (interrupt_futurize_t<decltype(fut)>)(std::move(fut));
+  }
+
+  template <typename ErrorFuncHead,
+           typename... ErrorFuncTail>
+  auto handle_error_interruptible(ErrorFuncHead&& error_func_head,
+                                 ErrorFuncTail&&... error_func_tail) {
+    assert(interrupt_cond<InterruptCond>);
+    static_assert(sizeof...(ErrorFuncTail) > 0);
+    return this->handle_error_interruptible(
+      ::crimson::composer(
+       std::forward<ErrorFuncHead>(error_func_head),
+       std::forward<ErrorFuncTail>(error_func_tail)...));
+  }
+private:
+  ErroratedFuture<::crimson::errorated_future_marker<T>>
+  to_future() {
+    return static_cast<core_type&&>(std::move(*this));
+  }
+
+  template <typename Iterator, typename AsyncAction, typename Result,
+           std::enable_if_t<
+             is_interruptible_future<Result>::value, int>>
+  friend auto interruptor<InterruptCond>::do_for_each(
+           Iterator, Iterator, AsyncAction&&);
+  template <typename Iterator, typename AsyncAction, typename Result,
+           std::enable_if_t<
+             !is_interruptible_future<Result>::value, int>>
+  friend auto interruptor<InterruptCond>::do_for_each(
+           Iterator, Iterator, AsyncAction&&);
+  template <typename AsyncAction, typename Result,
+           std::enable_if_t<
+             is_interruptible_future<Result>::value, int>>
+  friend auto interruptor<InterruptCond>::repeat(AsyncAction&&);
+  template <typename AsyncAction, typename Result,
+           std::enable_if_t<
+             !is_interruptible_future<Result>::value, int>>
+  friend auto interruptor<InterruptCond>::repeat(AsyncAction&&);
+  friend class interruptible_future_builder<InterruptCond>;
+  template <typename U>
+  friend struct ::seastar::futurize;
+  template <typename>
+  friend class ::seastar::future;
+  template<typename TX, typename F>
+  friend inline auto ::seastar::internal::do_with_impl(TX&& rvalue, F&& f);
+  template<typename T1, typename T2, typename T3_or_F, typename... More>
+  friend inline auto ::seastar::internal::do_with_impl(T1&& rv1, T2&& rv2, T3_or_F&& rv3, More&&... more);
+  template <typename T1, typename T2, typename... More>
+  friend auto seastar::internal::do_with_impl(T1&& rv1, T2&& rv2, More&&... more);
+  template <typename HeldState, typename Future>
+  friend class seastar::internal::do_with_state;
+  template <typename, typename>
+  friend class ::crimson::maybe_handle_error_t;
+  template <typename, typename>
+  friend class interruptible_future_detail;
+  template <typename Lock, typename Func>
+  friend inline auto seastar::with_lock(Lock& lock, Func&& f);
+};
+
+template <typename InterruptCond, typename T = void>
+using interruptible_future =
+  interruptible_future_detail<InterruptCond, seastar::future<T>>;
+
+template <typename InterruptCond, typename Errorator, typename T = void>
+using interruptible_errorated_future =
+  interruptible_future_detail<
+    InterruptCond,
+    typename Errorator::template future<T>>;
+
+template <typename InterruptCond>
+struct interruptor
+{
+public:
+  template <typename FutureType>
+  [[gnu::always_inline]]
+  static interruptible_future_detail<InterruptCond, FutureType>
+  make_interruptible(FutureType&& fut) {
+    return interruptible_future_detail<InterruptCond, FutureType>(std::move(fut));
+  }
+
+  [[gnu::always_inline]]
+  static interruptible_future_detail<InterruptCond, seastar::future<>> now() {
+    return interruptible_future_detail<
+             InterruptCond,
+             seastar::future<>>(seastar::now());
+  }
+
+  template <typename T>
+  struct futurize {
+    using type = interruptible_future_detail<
+      InterruptCond, typename seastar::futurize<T>::type>;
+
+    template <typename Func, typename... Args>
+    static type apply(Func&& func, std::tuple<Args...>&& args) noexcept {
+      return seastar::futurize<T>::apply(std::forward<Func>(func),
+                                        std::forward<std::tuple<Args...>>(args));
+    }
+  };
+
+  template <typename FutureType>
+  struct futurize<interruptible_future_detail<InterruptCond, FutureType>> {
+    using type = interruptible_future_detail<InterruptCond, FutureType>;
+
+    template <typename Func, typename... Args>
+    static type apply(Func&& func, std::tuple<Args...>&& args) noexcept {
+      return seastar::futurize<FutureType>::apply(
+         std::forward<Func>(func),
+         std::forward<std::tuple<Args...>>(args));
+    }
+  };
+
+  template <typename T>
+  using futurize_t = typename futurize<T>::type;
+
+  template <typename Func, typename... Args>
+  static auto futurize_apply(Func&& func, std::tuple<Args...>&& args) noexcept {
+    using futurator = futurize<std::result_of_t<Func(Args&&...)>>;
+    return futurator::apply(std::forward<Func>(func), std::move(args));
+  }
+
+  template <typename Container, typename AsyncAction>
+  [[gnu::always_inline]]
+  static auto do_for_each(Container& c, AsyncAction&& action) {
+    return do_for_each(std::begin(c), std::end(c),
+             std::forward<AsyncAction>(action));
+  }
+
+  template <typename OpFunc, typename OnInterrupt,
+           typename... InterruptCondParams>
+  static inline auto with_interruption(
+    OpFunc&& opfunc, OnInterrupt&& efunc, InterruptCondParams&&... params) {
+      // there may case like:
+      //       with_interruption([] {
+      //               return ...
+      //               .then_interruptible([] {
+      //                       with_interruption(...);
+      //               })
+      //       });
+      // in which the inner with_interruption might errorly release
+      // ::crimson::interruptible::interrupt_cond<InterruptCond>,
+      // so we have to avoid this scenario.
+      //
+      // TODO: maybe some kind of interrupt_cond stack should be implemented?
+      bool created = enable_interruption(
+         std::forward<InterruptCondParams>(params)...);
+      auto fut = futurize<std::result_of_t<OpFunc()>>::apply(
+           std::move(opfunc), std::make_tuple())
+       .template handle_interruption(std::move(efunc));
+      if (__builtin_expect(created, true)) {
+       disable_interruption();
+      }
+      return fut;
+  }
+
+  template <typename Iterator, typename AsyncAction,
+           typename Result = std::invoke_result_t<AsyncAction, typename Iterator::reference>,
+           std::enable_if_t<is_interruptible_future<Result>::value, int> = 0>
+  [[gnu::always_inline]]
+  static auto do_for_each(Iterator begin, Iterator end, AsyncAction&& action) {
+    if constexpr (seastar::is_future<typename Result::core_type>::value) {
+      return make_interruptible(
+         ::seastar::do_for_each(begin, end,
+           [action=std::move(action),
+           interrupt_condition=interrupt_cond<InterruptCond>]
+           (decltype(*begin) x) {
+           return call_with_interruption(
+                     interrupt_condition,
+                     std::move(action),
+                     std::forward<decltype(*begin)>(x)).to_future();
+         })
+      );
+    } else {
+      return make_interruptible(
+         ::crimson::do_for_each(begin, end,
+           [action=std::move(action),
+           interrupt_condition=interrupt_cond<InterruptCond>]
+           (decltype(*begin) x) {
+           return call_with_interruption(
+                     interrupt_condition,
+                     std::move(action),
+                     std::forward<decltype(*begin)>(x)).to_future();
+         })
+      );
+    }
+  }
+
+  template <typename Iterator, typename AsyncAction,
+           typename Result = std::invoke_result_t<AsyncAction, typename Iterator::reference>,
+           std::enable_if_t<!is_interruptible_future<Result>::value, int> = 0>
+  [[gnu::always_inline]]
+  static auto do_for_each(Iterator begin, Iterator end, AsyncAction&& action) {
+    if constexpr (seastar::is_future<Result>::value) {
+      return make_interruptible(
+         ::seastar::do_for_each(begin, end,
+           [action=std::move(action),
+           interrupt_condition=interrupt_cond<InterruptCond>]
+           (decltype(*begin) x) {
+           return call_with_interruption(
+                     interrupt_condition,
+                     std::move(action),
+                     std::forward<decltype(*begin)>(x));
+         })
+      );
+    } else {
+      return make_interruptible(
+         ::crimson::do_for_each(begin, end,
+           [action=std::move(action),
+           interrupt_condition=interrupt_cond<InterruptCond>]
+           (decltype(*begin) x) {
+           return call_with_interruption(
+                     interrupt_condition,
+                     std::move(action),
+                     std::forward<decltype(*begin)>(x));
+         })
+      );
+    }
+  }
+
+  template <typename AsyncAction,
+           typename Result = std::invoke_result_t<AsyncAction>,
+           std::enable_if_t<is_interruptible_future<Result>::value, int> = 0>
+  [[gnu::always_inline]]
+  static auto repeat(AsyncAction&& action) {
+    if constexpr (seastar::is_future<typename Result::core_type>::value) {
+      return make_interruptible(
+         ::seastar::repeat(
+           [action=std::move(action),
+           interrupt_condition=interrupt_cond<InterruptCond>] {
+           return call_with_interruption(
+                     interrupt_condition,
+                     std::move(action)).to_future();
+         })
+      );
+    } else {
+      return make_interruptible(
+         ::crimson::do_until(
+           [action=std::move(action),
+           interrupt_condition=interrupt_cond<InterruptCond>] {
+           return call_with_interruption(
+                     interrupt_condition,
+                     std::move(action)).to_future();
+         })
+      );
+    }
+  }
+  template <typename AsyncAction,
+           typename Result = std::invoke_result_t<AsyncAction>,
+           std::enable_if_t<
+             !is_interruptible_future<Result>::value, int> = 0>
+  [[gnu::always_inline]]
+  static auto repeat(AsyncAction&& action) {
+    if constexpr (seastar::is_future<Result>::value) {
+      return make_interruptible(
+         ::seastar::repeat(
+           [action=std::move(action),
+           interrupt_condition=interrupt_cond<InterruptCond>] {
+           return call_with_interruption(
+                     interrupt_condition,
+                     std::move(action));
+         })
+      );
+    } else {
+      return make_interruptible(
+         ::crimson::do_until(
+           [action=std::move(action),
+           interrupt_condition=interrupt_cond<InterruptCond>] {
+           return call_with_interruption(
+                     interrupt_condition,
+                     std::move(action));
+         })
+      );
+    }
+  }
+
+  class parallel_for_each_state final : private seastar::continuation_base<> {
+    using future_t = interruptible_future_detail<InterruptCond, seastar::future<>>;
+    std::vector<future_t> _incomplete;
+    seastar::promise<> _result;
+    std::exception_ptr _ex;
+  private:
+    void wait_for_one() noexcept {
+      while (!_incomplete.empty() && _incomplete.back().available()) {
+       if (_incomplete.back().failed()) {
+         _ex = _incomplete.back().get_exception();
+       }
+       _incomplete.pop_back();
+      }
+      if (!_incomplete.empty()) {
+       seastar::internal::set_callback(_incomplete.back(), static_cast<continuation_base<>*>(this));
+       _incomplete.pop_back();
+       return;
+      }
+      if (__builtin_expect(bool(_ex), false)) {
+       _result.set_exception(std::move(_ex));
+      } else {
+       _result.set_value();
+      }
+      delete this;
+    }
+    virtual void run_and_dispose() noexcept override {
+      if (_state.failed()) {
+       _ex = std::move(_state).get_exception();
+      }
+      _state = {};
+      wait_for_one();
+    }
+    task* waiting_task() noexcept override { return _result.waiting_task(); }
+  public:
+    parallel_for_each_state(size_t n) {
+      _incomplete.reserve(n);
+    }
+    void add_future(future_t&& f) {
+      _incomplete.push_back(std::move(f));
+    }
+    future_t get_future() {
+      auto ret = _result.get_future();
+      wait_for_one();
+      return ret;
+    }
+  };
+  template <typename Iterator, typename Func>
+  static inline interruptible_future_detail<InterruptCond, seastar::future<>>
+  parallel_for_each(Iterator begin, Iterator end, Func&& func) noexcept {
+    parallel_for_each_state* s = nullptr;
+    auto decorated_func =
+      [func=std::forward<Func>(func),
+      interrupt_condition=interrupt_cond<InterruptCond>]
+      (decltype(*Iterator())&& x) mutable {
+       return call_with_interruption(
+                 interrupt_condition,
+                 std::forward<Func>(func),
+                 std::forward<decltype(*begin)>(x));
+      };
+    // Process all elements, giving each future the following treatment:
+    //   - available, not failed: do nothing
+    //   - available, failed: collect exception in ex
+    //   - not available: collect in s (allocating it if needed)
+    while (begin != end) {
+      auto f = seastar::futurize_invoke(decorated_func, *begin++);
+      if (!f.available() || f.failed()) {
+       if (!s) {
+         using itraits = std::iterator_traits<Iterator>;
+         auto n = (seastar::internal::iterator_range_estimate_vector_capacity(
+               begin, end, typename itraits::iterator_category()) + 1);
+         s = new parallel_for_each_state(n);
+       }
+       s->add_future(std::move(f));
+      }
+    }
+    // If any futures were not available, hand off to parallel_for_each_state::start().
+    // Otherwise we can return a result immediately.
+    if (s) {
+      // s->get_future() takes ownership of s (and chains it to one of the futures it contains)
+      // so this isn't a leak
+      return s->get_future();
+    }
+    return seastar::make_ready_future<>();
+  }
+
+  template <typename Container, typename Func>
+  static inline auto parallel_for_each(Container&& container, Func&& func) noexcept {
+    return parallel_for_each(
+           std::begin(container),
+           std::end(container),
+           std::forward<Func>(func));
+  }
+
+  template <typename Iterator, typename Mapper, typename Initial, typename Reduce>
+  static inline interruptible_future<InterruptCond, Initial> map_reduce(
+    Iterator begin, Iterator end, Mapper&& mapper, Initial initial, Reduce&& reduce) {
+    struct state {
+      Initial result;
+      Reduce reduce;
+    };
+    auto s = seastar::make_lw_shared(state{std::move(initial), std::move(reduce)});
+    interruptible_future<InterruptCond> ret = seastar::make_ready_future<>();
+    while (begin != end) {
+        ret = seastar::futurize_invoke(mapper, *begin++).then_wrapped_interruptible(
+           [s = s.get(), ret = std::move(ret)] (auto f) mutable {
+            try {
+                s->result = s->reduce(std::move(s->result), std::move(f.get0()));
+                return std::move(ret);
+            } catch (...) {
+                return std::move(ret).then_wrapped_interruptible([ex = std::current_exception()] (auto f) {
+                    f.ignore_ready_future();
+                    return seastar::make_exception_future<>(ex);
+                });
+            }
+        });
+    }
+    return ret.then_interruptible([s] {
+        return seastar::make_ready_future<Initial>(std::move(s->result));
+    });
+  }
+  template <typename Range, typename Mapper, typename Initial, typename Reduce>
+  static inline interruptible_future<InterruptCond, Initial> map_reduce(
+    Range&& range, Mapper&& mapper, Initial initial, Reduce&& reduce) {
+    return map_reduce(std::begin(range), std::end(range), std::forward<Mapper>(mapper),
+                     std::move(initial), std::move(reduce));
+  }
+
+  template<typename Fut, 
+          std::enable_if_t<
+            seastar::is_future<Fut>::value
+            || is_interruptible_future<Fut>::value, int> = 0>
+  static auto futurize_invoke_if_func(Fut&& fut) noexcept {
+       return std::forward<Fut>(fut);
+  }
+
+  template<typename Func,
+          std::enable_if_t<
+            !seastar::is_future<Func>::value
+            && !is_interruptible_future<Func>::value, int> = 0>
+  static auto futurize_invoke_if_func(Func&& func) noexcept {
+       return seastar::futurize_invoke(std::forward<Func>(func));
+  }
+
+  template <typename... FutOrFuncs>
+  static inline auto when_all(FutOrFuncs&&... fut_or_funcs) noexcept {
+    return ::seastar::internal::when_all_impl(
+       futurize_invoke_if_func(std::forward<FutOrFuncs>(fut_or_funcs))...);
+  }
+private:
+  // return true if an new interrupt condition is created and false otherwise
+  template <typename... Args>
+  [[gnu::always_inline]]
+  static bool enable_interruption(Args&&... args) {
+    if (!interrupt_cond<InterruptCond>){
+      interrupt_cond<InterruptCond> = seastar::make_lw_shared<
+       InterruptCond>(std::forward<Args>(args)...);
+      return true;
+    }
+    return false;
+  }
+
+  [[gnu::always_inline]]
+  static void disable_interruption() {
+    if (interrupt_cond<InterruptCond>)
+      interrupt_cond<InterruptCond>.release();
+  }
+};
+
+} // namespace crimson::interruptible
+
+namespace seastar {
+
+template <typename InterruptCond, typename... T>
+struct futurize<::crimson::interruptible::interruptible_future_detail<
+  InterruptCond, seastar::future<T...>>> {
+  using type = ::crimson::interruptible::interruptible_future_detail<
+    InterruptCond, seastar::future<T...>>;
+
+  using value_type = typename type::value_type;
+  using tuple_type = typename type::tuple_type;
+
+  static type from_tuple(tuple_type&& value) {
+    return type(ready_future_marker(), std::move(value));
+  }
+  static type from_tuple(const tuple_type& value) {
+    return type(ready_future_marker(), value);
+  }
+  static type from_tuple(value_type&& value) {
+    return type(ready_future_marker(), std::move(value));
+  }
+  static type from_tuple(const value_type& value) {
+    return type(ready_future_marker(), value);
+  }
+
+  template <typename Func, typename... FuncArgs>
+  [[gnu::always_inline]]
+  static inline type invoke(Func&& func, FuncArgs&&... args) noexcept {
+    try {
+      return func(std::forward<FuncArgs>(args)...);
+    } catch (...) {
+      return make_exception_future(std::current_exception());
+    }
+  }
+
+  template <typename Func>
+  [[gnu::always_inline]]
+  static type invoke(Func&& func, seastar::internal::monostate) noexcept {
+    try {
+      return ::seastar::futurize_invoke(std::forward<Func>(func));
+    } catch (...) {
+      return make_exception_future(std::current_exception());
+    }
+  }
+
+  template <typename Arg>
+  static inline type make_exception_future(Arg&& arg) noexcept {
+    return seastar::make_exception_future<T...>(std::forward<Arg>(arg));
+  }
+
+  static inline type make_exception_future(future_state_base&& state) noexcept {
+    return seastar::internal::make_exception_future<T...>(std::move(state));
+  }
+
+  template<typename PromiseT, typename Func>
+  static void satisfy_with_result_of(PromiseT&& pr, Func&& func) {
+    func().forward_to(std::move(pr));
+  }
+};
+
+template <typename InterruptCond,
+         template <typename...> typename ErroratedFuture,
+         typename... T>
+struct futurize<
+  ::crimson::interruptible::interruptible_future_detail<
+    InterruptCond,
+    ErroratedFuture<::crimson::errorated_future_marker<T...>>
+  >
+> {
+  using type = ::crimson::interruptible::interruptible_future_detail<
+    InterruptCond,
+    ErroratedFuture<::crimson::errorated_future_marker<T...>>>;
+  using errorator_type =
+    typename ErroratedFuture<
+      ::crimson::errorated_future_marker<T...>>::errorator_type;
+
+  template<typename Func, typename... FuncArgs>
+  static inline type invoke(Func&& func, FuncArgs&&... args) noexcept {
+    try {
+        return func(std::forward<FuncArgs>(args)...);
+    } catch (...) {
+        return make_exception_future(std::current_exception());
+    }
+  }
+
+  template <typename Func>
+  [[gnu::always_inline]]
+  static type invoke(Func&& func, seastar::internal::monostate) noexcept {
+    try {
+      return ::seastar::futurize_invoke(std::forward<Func>(func));
+    } catch (...) {
+      return make_exception_future(std::current_exception());
+    }
+  }
+
+  template <typename Arg>
+  static inline type make_exception_future(Arg&& arg) noexcept {
+    return errorator_type::template make_exception_future2<T...>(std::forward<Arg>(arg));
+  }
+
+  template<typename PromiseT, typename Func>
+  static void satisfy_with_result_of(PromiseT&& pr, Func&& func) {
+    func().forward_to(std::move(pr));
+  }
+
+};
+
+template <typename InterruptCond, typename FutureType>
+struct continuation_base_from_future<
+  ::crimson::interruptible::interruptible_future_detail<InterruptCond, FutureType>> {
+  using type = typename seastar::continuation_base_from_future<FutureType>::type;
+};
+
+} // namespace seastar
index 898f70c423697adc01244e8fdcd963ad552bd476..592503068f393f5f7d66657c7ecf85b80cc4ce56 100644 (file)
@@ -27,6 +27,7 @@ add_executable(crimson-osd
   scheduler/mclock_scheduler.cc
   osdmap_gate.cc
   pg_map.cc
+  pg_interval_interrupt_condition.cc
   objclass.cc
   ${PROJECT_SOURCE_DIR}/src/objclass/class_api.cc
   ${PROJECT_SOURCE_DIR}/src/osd/ClassHandler.cc
diff --git a/src/crimson/osd/pg_interval_interrupt_condition.cc b/src/crimson/osd/pg_interval_interrupt_condition.cc
new file mode 100644 (file)
index 0000000..6be3193
--- /dev/null
@@ -0,0 +1,24 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "io_interrupt_condition.h"
+#include "pg.h"
+
+namespace crimson::osd {
+
+IOInterruptCondition::IOInterruptCondition(Ref<PG>& pg)
+  : pg(pg), e(pg->get_osdmap_epoch()) {}
+
+epoch_t IOInterruptCondition::get_current_osdmap_epoch() {
+  return pg->get_osdmap_epoch();
+}
+
+bool IOInterruptCondition::is_stopping() {
+  return pg->stopping;
+}
+
+bool IOInterruptCondition::is_primary() {
+  return pg->is_primary();
+}
+
+} // namespace crimson::osd
diff --git a/src/crimson/osd/pg_interval_interrupt_condition.h b/src/crimson/osd/pg_interval_interrupt_condition.h
new file mode 100644 (file)
index 0000000..4ad5041
--- /dev/null
@@ -0,0 +1,56 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
+// vim: ts=8 sw=2 smarttab expandtab
+
+#pragma once
+
+#include "include/types.h"
+#include "crimson/common/errorator.h"
+#include "crimson/common/exception.h"
+#include "crimson/common/type_helpers.h"
+
+namespace crimson::osd {
+
+class PG;
+
+class IOInterruptCondition {
+public:
+  IOInterruptCondition(Ref<PG>& pg);
+
+  epoch_t get_current_osdmap_epoch();
+
+  bool is_stopping();
+
+  bool is_primary();
+
+  template <typename T>
+  std::pair<bool, std::optional<T>> may_interrupt() {
+    if (e != get_current_osdmap_epoch()) {
+      return std::pair<bool, std::optional<T>>(
+               true, seastar::futurize<T>::make_exception_future(
+                         ::crimson::common::actingset_changed(is_primary())));
+    }
+    if (is_stopping()) {
+      return {true, seastar::futurize<Fut>::make_exception_future(
+        ::crimson::common::system_shutdown_exception())};
+    }
+    return {false, std::optional<Fut>()};
+  }
+
+  template <typename T>
+  static constexpr bool is_interruption_v =
+    std::is_same_v<T, ::crimson::common::actingset_changed>
+    || std::is_same_v<T, ::crimson::common::system_shutdown_exception>;
+
+  bool is_interruption(std::exception_ptr& eptr) {
+    return (*eptr.__cxa_exception_type() ==
+            typeid(::crimson::common::actingset_changed) ||
+            *eptr.__cxa_exception_type() ==
+            typeid(::crimson::common::system_shutdown_exception));
+  }
+
+private:
+  Ref<PG> pg;
+  epoch_t e;
+};
+
+} // namespace crimson::osd