From d7d19ad65ca927d42ed143ec99a262d633a0e194 Mon Sep 17 00:00:00 2001 From: Kefu Chai Date: Tue, 21 Jan 2020 12:58:16 +0800 Subject: [PATCH] crimson/thread: generalize Task so it works w/ func returns void Signed-off-by: Kefu Chai --- src/crimson/thread/ThreadPool.h | 29 +++++++++++++++++++++------- src/test/crimson/test_thread_pool.cc | 8 ++++++++ 2 files changed, 30 insertions(+), 7 deletions(-) diff --git a/src/crimson/thread/ThreadPool.h b/src/crimson/thread/ThreadPool.h index dbc52388217..d648a6b6ed7 100644 --- a/src/crimson/thread/ThreadPool.h +++ b/src/crimson/thread/ThreadPool.h @@ -22,28 +22,43 @@ struct WorkItem { virtual void process() = 0; }; -template> +template struct Task final : WorkItem { - Func func; - seastar::future_state state; - crimson::thread::Condition on_done; + using T = std::invoke_result_t; + using future_state_t = std::conditional_t, + seastar::future_state<>, + seastar::future_state>; + using futurator_t = seastar::futurize; public: explicit Task(Func&& f) : func(std::move(f)) {} void process() override { try { - state.set(func()); + if constexpr (std::is_void_v) { + func(); + state.set(); + } else { + state.set(func()); + } } catch (...) { state.set_exception(std::current_exception()); } on_done.notify(); } - seastar::future get_future() { + typename futurator_t::type get_future() { return on_done.wait().then([this] { - return seastar::make_ready_future(state.get0(std::move(state).get())); + if (state.failed()) { + return futurator_t::make_exception_future(state.get_exception()); + } else { + return futurator_t::from_tuple(state.get_value()); + } }); } +private: + Func func; + future_state_t state; + crimson::thread::Condition on_done; }; struct SubmitQueue { diff --git a/src/test/crimson/test_thread_pool.cc b/src/test/crimson/test_thread_pool.cc index ede6ea1ebe0..b3dd719e29f 100644 --- a/src/test/crimson/test_thread_pool.cc +++ b/src/test/crimson/test_thread_pool.cc @@ -25,6 +25,12 @@ seastar::future<> test_accumulate(ThreadPool& tp) { }); } +seastar::future<> test_void_return(ThreadPool& tp) { + return tp.submit([=] { + std::this_thread::sleep_for(10ns); + }); +} + int main(int argc, char** argv) { ThreadPool tp{2, 128, 0}; @@ -32,6 +38,8 @@ int main(int argc, char** argv) return app.run(argc, argv, [&tp] { return tp.start().then([&tp] { return test_accumulate(tp); + }).then([&tp] { + return test_void_return(tp); }).handle_exception([](auto e) { std::cerr << "Error: " << e << std::endl; seastar::engine().exit(1); -- 2.39.5