}
}
+template <typename InterruptCond, typename Errorator>
+struct interruptible_errorator;
+
+template <typename T>
+struct parallel_for_each_ret {
+ static_assert(seastar::is_future<T>::value);
+ using type = seastar::future<>;
+};
+
+template <template <typename...> typename ErroratedFuture, typename T>
+struct parallel_for_each_ret<
+ ErroratedFuture<
+ ::crimson::errorated_future_marker<T>>> {
+ using type = ErroratedFuture<::crimson::errorated_future_marker<void>>;
+};
+
+template <typename InterruptCond, typename FutureType>
+class parallel_for_each_state final : private seastar::continuation_base<> {
+ using elem_ret_t = std::conditional_t<
+ is_interruptible_future<FutureType>::value,
+ typename FutureType::core_type,
+ FutureType>;
+ using future_t = interruptible_future_detail<
+ InterruptCond,
+ typename parallel_for_each_ret<elem_ret_t>::type>;
+ std::vector<future_t> _incomplete;
+ seastar::promise<> _result;
+ std::exception_ptr _ex;
+private:
+ void wait_for_one() noexcept {
+ while (!_incomplete.empty() && _incomplete.back().available()) {
+ if (_incomplete.back().failed()) {
+ _ex = _incomplete.back().get_exception();
+ }
+ _incomplete.pop_back();
+ }
+ if (!_incomplete.empty()) {
+ seastar::internal::set_callback(_incomplete.back(), static_cast<continuation_base<>*>(this));
+ _incomplete.pop_back();
+ return;
+ }
+ if (__builtin_expect(bool(_ex), false)) {
+ _result.set_exception(std::move(_ex));
+ } else {
+ _result.set_value();
+ }
+ delete this;
+ }
+ virtual void run_and_dispose() noexcept override {
+ if (_state.failed()) {
+ _ex = std::move(_state).get_exception();
+ }
+ _state = {};
+ wait_for_one();
+ }
+ task* waiting_task() noexcept override { return _result.waiting_task(); }
+public:
+ parallel_for_each_state(size_t n) {
+ _incomplete.reserve(n);
+ }
+ void add_future(future_t&& f) {
+ _incomplete.push_back(std::move(f));
+ }
+ future_t get_future() {
+ auto ret = _result.get_future();
+ wait_for_one();
+ return ret;
+ }
+ static future_t now() {
+ return seastar::now();
+ }
+};
+
template <typename InterruptCond, typename T>
class [[nodiscard]] interruptible_future_detail<InterruptCond, seastar::future<T>>
: private seastar::future<T> {
friend class ::seastar::internal::when_all_state_component;
template <typename Lock, typename Func>
friend inline auto seastar::with_lock(Lock& lock, Func&& f);
+ template <typename IC, typename FT>
+ friend class parallel_for_each_state;
};
template <typename InterruptCond, typename Errorator>
using core_type::available;
using core_type::failed;
using core_type::core_type;
+ using core_type::get_exception;
using value_type = typename core_type::value_type;
}
private:
+ using core_type::_then;
template <typename Func>
[[gnu::always_inline]]
auto handle_interruption(Func&& func) {
friend class interruptible_future_detail;
template <typename Lock, typename Func>
friend inline auto seastar::with_lock(Lock& lock, Func&& f);
+ template <typename IC, typename FT>
+ friend class parallel_for_each_state;
};
template <typename InterruptCond, typename T = void>
Func &&f, InterruptCond &&cond, Params&&... params) {
using func_result_t = std::invoke_result_t<Func, Params...>;
using func_ertr_t =
- typename seastar::template futurize<func_result_t>::errorator_type;
+ typename seastar::template futurize<
+ func_result_t>::core_type::errorator_type;
using with_trans_ertr =
typename func_ertr_t::template extend_ertr<errorator<Error>>;
::seastar::do_for_each(begin, end,
[action=std::move(action),
interrupt_condition=interrupt_cond<InterruptCond>]
- (decltype(*begin) x) {
+ (typename Iterator::reference x) mutable {
return call_with_interruption(
interrupt_condition,
std::move(action),
::crimson::do_for_each(begin, end,
[action=std::move(action),
interrupt_condition=interrupt_cond<InterruptCond>]
- (decltype(*begin) x) {
+ (typename Iterator::reference x) mutable {
return call_with_interruption(
interrupt_condition,
std::move(action),
::seastar::do_for_each(begin, end,
[action=std::move(action),
interrupt_condition=interrupt_cond<InterruptCond>]
- (decltype(*begin) x) {
+ (typename Iterator::reference x) mutable {
return call_with_interruption(
interrupt_condition,
std::move(action),
::crimson::do_for_each(begin, end,
[action=std::move(action),
interrupt_condition=interrupt_cond<InterruptCond>]
- (decltype(*begin) x) {
+ (typename Iterator::reference x) mutable {
return call_with_interruption(
interrupt_condition,
std::move(action),
}
}
- class parallel_for_each_state final : private seastar::continuation_base<> {
- using future_t = interruptible_future_detail<InterruptCond, seastar::future<>>;
- std::vector<future_t> _incomplete;
- seastar::promise<> _result;
- std::exception_ptr _ex;
- private:
- void wait_for_one() noexcept {
- while (!_incomplete.empty() && _incomplete.back().available()) {
- if (_incomplete.back().failed()) {
- _ex = _incomplete.back().get_exception();
- }
- _incomplete.pop_back();
- }
- if (!_incomplete.empty()) {
- seastar::internal::set_callback(_incomplete.back(), static_cast<continuation_base<>*>(this));
- _incomplete.pop_back();
- return;
- }
- if (__builtin_expect(bool(_ex), false)) {
- _result.set_exception(std::move(_ex));
- } else {
- _result.set_value();
- }
- delete this;
- }
- virtual void run_and_dispose() noexcept override {
- if (_state.failed()) {
- _ex = std::move(_state).get_exception();
- }
- _state = {};
- wait_for_one();
- }
- task* waiting_task() noexcept override { return _result.waiting_task(); }
- public:
- parallel_for_each_state(size_t n) {
- _incomplete.reserve(n);
- }
- void add_future(future_t&& f) {
- _incomplete.push_back(std::move(f));
- }
- future_t get_future() {
- auto ret = _result.get_future();
- wait_for_one();
- return ret;
- }
- };
template <typename Iterator, typename Func>
- static inline interruptible_future_detail<InterruptCond, seastar::future<>>
- parallel_for_each(Iterator begin, Iterator end, Func&& func) noexcept {
- parallel_for_each_state* s = nullptr;
+ static inline auto parallel_for_each(
+ Iterator begin,
+ Iterator end,
+ Func&& func
+ ) noexcept {
+ using ResultType = std::invoke_result_t<Func, typename Iterator::reference>;
+ parallel_for_each_state<InterruptCond, ResultType>* s = nullptr;
auto decorated_func =
[func=std::forward<Func>(func),
interrupt_condition=interrupt_cond<InterruptCond>]
using itraits = std::iterator_traits<Iterator>;
auto n = (seastar::internal::iterator_range_estimate_vector_capacity(
begin, end, typename itraits::iterator_category()) + 1);
- s = new parallel_for_each_state(n);
+ s = new parallel_for_each_state<InterruptCond, ResultType>(n);
}
s->add_future(std::move(f));
}
// so this isn't a leak
return s->get_future();
}
- return seastar::make_ready_future<>();
+ return parallel_for_each_state<InterruptCond, ResultType>::now();
}
template <typename Container, typename Func>
using type = ::crimson::interruptible::interruptible_future_detail<
InterruptCond,
ErroratedFuture<::crimson::errorated_future_marker<T...>>>;
+ using core_type = ErroratedFuture<
+ ::crimson::errorated_future_marker<T...>>;
using errorator_type =
- typename ErroratedFuture<
- ::crimson::errorated_future_marker<T...>>::errorator_type;
+ ::crimson::interruptible::interruptible_errorator<
+ InterruptCond,
+ typename ErroratedFuture<
+ ::crimson::errorated_future_marker<T...>>::errorator_type>;
template<typename Func, typename... FuncArgs>
static inline type invoke(Func&& func, FuncArgs&&... args) noexcept {
template <typename Arg>
static inline type make_exception_future(Arg&& arg) noexcept {
- return errorator_type::template make_exception_future2<T...>(std::forward<Arg>(arg));
+ return core_type::errorator_type::template make_exception_future2<T...>(
+ std::forward<Arg>(arg));
}
template<typename PromiseT, typename Func>