namespace crimson {
-template<typename Iterator, typename AsyncAction>
-inline auto do_for_each(Iterator begin, Iterator end, AsyncAction action) {
- using futurator = \
- ::seastar::futurize<std::invoke_result_t<AsyncAction, decltype(*begin)>>;
+// crimson::do_for_each_state is the mirror of seastar::do_for_each_state with FutureT
+template <typename Iterator, typename AsyncAction, typename FutureT>
+class do_for_each_state final : public seastar::continuation_base<> {
+ Iterator _begin;
+ Iterator _end;
+ AsyncAction _action;
+ seastar::promise<> _pr;
- if (begin == end) {
- return futurator::type::errorator_type::template make_ready_future<>();
+public:
+ do_for_each_state(Iterator begin, Iterator end, AsyncAction action,
+ FutureT&& first_unavailable)
+ : _begin(std::move(begin)), _end(std::move(end)), _action(std::move(action)) {
+ seastar::internal::set_callback(std::move(first_unavailable), this);
}
- while (true) {
- auto f = futurator::invoke(action, *begin);
- ++begin;
- if (begin == end) {
- return f;
+ virtual void run_and_dispose() noexcept override {
+ std::unique_ptr<do_for_each_state> zis(this);
+ if (_state.failed()) {
+ _pr.set_urgent_state(std::move(_state));
+ return;
}
- if (!f.available() || seastar::need_preempt()) {
- return std::move(f)._then(
- [ action = std::move(action),
- begin = std::move(begin),
- end = std::move(end)
- ] () mutable {
- return ::crimson::do_for_each(std::move(begin),
- std::move(end),
- std::move(action));
- });
+ while (_begin != _end) {
+ auto f = seastar::futurize_invoke(_action, *_begin);
+ ++_begin;
+ if (f.failed()) {
+ f._forward_to(std::move(_pr));
+ return;
+ }
+ if (!f.available() || seastar::need_preempt()) {
+ _state = {};
+ seastar::internal::set_callback(std::move(f), this);
+ zis.release();
+ return;
+ }
}
+ _pr.set_value();
+ }
+ task* waiting_task() noexcept override {
+ return _pr.waiting_task();
+ }
+ FutureT get_future() {
+ return _pr.get_future();
+ }
+};
+
+template<typename Iterator, typename AsyncAction,
+ typename FutureT = std::invoke_result_t<AsyncAction, typename Iterator::reference>>
+inline FutureT do_for_each_impl(Iterator begin, Iterator end, AsyncAction action) {
+ while (begin != end) {
+ auto f = seastar::futurize_invoke(action, *begin);
+ ++begin;
if (f.failed()) {
return f;
}
+ if (!f.available() || seastar::need_preempt()) {
+ // s will be freed by run_and_dispose()
+ auto* s = new crimson::do_for_each_state<Iterator, AsyncAction, FutureT>{
+ std::move(begin), std::move(end), std::move(action), std::move(f)};
+ return s->get_future();
+ }
}
+ return seastar::make_ready_future<>();
}
+
+template<typename Iterator, typename AsyncAction>
+inline auto do_for_each(Iterator begin, Iterator end, AsyncAction action) {
+ return ::crimson::do_for_each_impl(begin, end, std::move(action));
+}
+
template<typename Container, typename AsyncAction>
inline auto do_for_each(Container& c, AsyncAction action) {
return ::crimson::do_for_each(std::begin(c), std::end(c), std::move(action));
auto _then(Func&& func) {
return base_t::then(std::forward<Func>(func));
}
+ template <class T>
+ auto _forward_to(T&& pr) {
+ return base_t::forward_to(std::forward<T>(pr));
+ }
template<typename Iterator, typename AsyncAction>
friend inline auto ::crimson::do_for_each(Iterator begin,
Iterator end,
AsyncAction action);
+ template <typename Iterator, typename AsyncAction, typename FutureT>
+ friend class ::crimson::do_for_each_state;
+
template<typename AsyncAction>
friend inline auto ::crimson::repeat(AsyncAction action);