virtual void process() = 0;
};
-template<typename Func, typename T = std::invoke_result_t<Func>>
+template<typename Func>
struct Task final : WorkItem {
- Func func;
- seastar::future_state<T> state;
- crimson::thread::Condition on_done;
+ using T = std::invoke_result_t<Func>;
+ using future_state_t = std::conditional_t<std::is_void_v<T>,
+ seastar::future_state<>,
+ seastar::future_state<T>>;
+ using futurator_t = seastar::futurize<T>;
public:
explicit Task(Func&& f)
: func(std::move(f))
{}
void process() override {
try {
- state.set(func());
+ if constexpr (std::is_void_v<T>) {
+ func();
+ state.set();
+ } else {
+ state.set(func());
+ }
} catch (...) {
state.set_exception(std::current_exception());
}
on_done.notify();
}
- seastar::future<T> get_future() {
+ typename futurator_t::type get_future() {
return on_done.wait().then([this] {
- return seastar::make_ready_future<T>(state.get0(std::move(state).get()));
+ if (state.failed()) {
+ return futurator_t::make_exception_future(state.get_exception());
+ } else {
+ return futurator_t::from_tuple(state.get_value());
+ }
});
}
+private:
+ Func func;
+ future_state_t state;
+ crimson::thread::Condition on_done;
};
struct SubmitQueue {
});
}
+seastar::future<> test_void_return(ThreadPool& tp) {
+ return tp.submit([=] {
+ std::this_thread::sleep_for(10ns);
+ });
+}
+
int main(int argc, char** argv)
{
ThreadPool tp{2, 128, 0};
return app.run(argc, argv, [&tp] {
return tp.start().then([&tp] {
return test_accumulate(tp);
+ }).then([&tp] {
+ return test_void_return(tp);
}).handle_exception([](auto e) {
std::cerr << "Error: " << e << std::endl;
seastar::engine().exit(1);