]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/thread: generalize Task so it works w/ func returns void 32742/head
authorKefu Chai <kchai@redhat.com>
Tue, 21 Jan 2020 04:58:16 +0000 (12:58 +0800)
committerKefu Chai <kchai@redhat.com>
Tue, 21 Jan 2020 13:39:42 +0000 (21:39 +0800)
Signed-off-by: Kefu Chai <kchai@redhat.com>
src/crimson/thread/ThreadPool.h
src/test/crimson/test_thread_pool.cc

index dbc523882176ac7dd9fe822c8b6c1198b5f01d42..d648a6b6ed79c8b59f3c0ffff3206b80fb0dc61d 100644 (file)
@@ -22,28 +22,43 @@ struct WorkItem {
   virtual void process() = 0;
 };
 
-template<typename Func, typename T = std::invoke_result_t<Func>>
+template<typename Func>
 struct Task final : WorkItem {
-  Func func;
-  seastar::future_state<T> state;
-  crimson::thread::Condition on_done;
+  using T = std::invoke_result_t<Func>;
+  using future_state_t = std::conditional_t<std::is_void_v<T>,
+                                           seastar::future_state<>,
+                                           seastar::future_state<T>>;
+  using futurator_t = seastar::futurize<T>;
 public:
   explicit Task(Func&& f)
     : func(std::move(f))
   {}
   void process() override {
     try {
-      state.set(func());
+      if constexpr (std::is_void_v<T>) {
+        func();
+        state.set();
+      } else {
+        state.set(func());
+      }
     } catch (...) {
       state.set_exception(std::current_exception());
     }
     on_done.notify();
   }
-  seastar::future<T> get_future() {
+  typename futurator_t::type get_future() {
     return on_done.wait().then([this] {
-      return seastar::make_ready_future<T>(state.get0(std::move(state).get()));
+      if (state.failed()) {
+       return futurator_t::make_exception_future(state.get_exception());
+      } else {
+       return futurator_t::from_tuple(state.get_value());
+      }
     });
   }
+private:
+  Func func;
+  future_state_t state;
+  crimson::thread::Condition on_done;
 };
 
 struct SubmitQueue {
index ede6ea1ebe07c04c0ec8f45c751ef521d14ed200..b3dd719e29f2a3dc9c185630b5a51a4933311e5d 100644 (file)
@@ -25,6 +25,12 @@ seastar::future<> test_accumulate(ThreadPool& tp) {
   });
 }
 
+seastar::future<> test_void_return(ThreadPool& tp) {
+  return tp.submit([=] {
+    std::this_thread::sleep_for(10ns);
+  });
+}
+
 int main(int argc, char** argv)
 {
   ThreadPool tp{2, 128, 0};
@@ -32,6 +38,8 @@ int main(int argc, char** argv)
   return app.run(argc, argv, [&tp] {
       return tp.start().then([&tp] {
           return test_accumulate(tp);
+        }).then([&tp] {
+          return test_void_return(tp);
         }).handle_exception([](auto e) {
           std::cerr << "Error: " << e << std::endl;
           seastar::engine().exit(1);