From: Kefu Chai Date: Tue, 21 Jan 2020 04:58:16 +0000 (+0800) Subject: crimson/thread: generalize Task so it works w/ func returns void X-Git-Tag: v15.1.0~141^2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=d7d19ad65ca927d42ed143ec99a262d633a0e194;p=ceph.git crimson/thread: generalize Task so it works w/ func returns void Signed-off-by: Kefu Chai --- diff --git a/src/crimson/thread/ThreadPool.h b/src/crimson/thread/ThreadPool.h index dbc523882176..d648a6b6ed79 100644 --- a/src/crimson/thread/ThreadPool.h +++ b/src/crimson/thread/ThreadPool.h @@ -22,28 +22,43 @@ struct WorkItem { virtual void process() = 0; }; -template> +template struct Task final : WorkItem { - Func func; - seastar::future_state state; - crimson::thread::Condition on_done; + using T = std::invoke_result_t; + using future_state_t = std::conditional_t, + seastar::future_state<>, + seastar::future_state>; + using futurator_t = seastar::futurize; public: explicit Task(Func&& f) : func(std::move(f)) {} void process() override { try { - state.set(func()); + if constexpr (std::is_void_v) { + func(); + state.set(); + } else { + state.set(func()); + } } catch (...) { state.set_exception(std::current_exception()); } on_done.notify(); } - seastar::future get_future() { + typename futurator_t::type get_future() { return on_done.wait().then([this] { - return seastar::make_ready_future(state.get0(std::move(state).get())); + if (state.failed()) { + return futurator_t::make_exception_future(state.get_exception()); + } else { + return futurator_t::from_tuple(state.get_value()); + } }); } +private: + Func func; + future_state_t state; + crimson::thread::Condition on_done; }; struct SubmitQueue { diff --git a/src/test/crimson/test_thread_pool.cc b/src/test/crimson/test_thread_pool.cc index ede6ea1ebe07..b3dd719e29f2 100644 --- a/src/test/crimson/test_thread_pool.cc +++ b/src/test/crimson/test_thread_pool.cc @@ -25,6 +25,12 @@ seastar::future<> test_accumulate(ThreadPool& tp) { }); } +seastar::future<> test_void_return(ThreadPool& tp) { + return tp.submit([=] { + std::this_thread::sleep_for(10ns); + }); +} + int main(int argc, char** argv) { ThreadPool tp{2, 128, 0}; @@ -32,6 +38,8 @@ int main(int argc, char** argv) return app.run(argc, argv, [&tp] { return tp.start().then([&tp] { return test_accumulate(tp); + }).then([&tp] { + return test_void_return(tp); }).handle_exception([](auto e) { std::cerr << "Error: " << e << std::endl; seastar::engine().exit(1);