]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/thread: add a thread pool impl
authorKefu Chai <kchai@redhat.com>
Thu, 7 Jun 2018 12:23:11 +0000 (20:23 +0800)
committerKefu Chai <kchai@redhat.com>
Thu, 28 Jun 2018 15:52:54 +0000 (23:52 +0800)
Signed-off-by: Kefu Chai <kchai@redhat.com>
src/crimson/CMakeLists.txt
src/crimson/net/Config.h
src/crimson/thread/CMakeLists.txt
src/crimson/thread/ThreadPool.cc [new file with mode: 0644]
src/crimson/thread/ThreadPool.h [new file with mode: 0644]
src/test/crimson/CMakeLists.txt
src/test/crimson/test_thread_pool.cc [new file with mode: 0644]

index e5162eff419bbc04e7d8f3d85b3970899f5b4bd1..36a7b102a7eed7e51d5f14ef88ca17f1d30cfe0e 100644 (file)
@@ -1 +1,2 @@
 add_subdirectory(net)
+add_subdirectory(thread)
index 21b0a4b5269a1325efea1de59ab07a8e6d40ac93..90929bdedb251890ce01f851b99f81f6bc186bff 100644 (file)
@@ -20,6 +20,7 @@ constexpr struct simple_md_config_t {
   bool ms_die_on_skipped_message = true;
   std::chrono::milliseconds ms_initial_backoff = 200ms;
   std::chrono::milliseconds ms_max_backoff = 15000ms;
+  std::chrono::milliseconds threadpool_empty_queue_max_wait = 100ms;
   size_t osd_client_message_size_cap = 500ULL << 20;
 } conf;
 }
index ff2e5e44fc5ebb126f140fd691eea5e26b23b4cd..339ff4b1bc43f6fb55b8238415e41f62076d41dc 100644 (file)
@@ -1,4 +1,5 @@
 set(crimson_thread_srcs
+  ThreadPool.cc
   Throttle.cc)
 add_library(crimson_thread_objs OBJECT ${crimson_thread_srcs})
 target_compile_definitions(crimson_thread_objs
diff --git a/src/crimson/thread/ThreadPool.cc b/src/crimson/thread/ThreadPool.cc
new file mode 100644 (file)
index 0000000..96973b8
--- /dev/null
@@ -0,0 +1,60 @@
+#include "ThreadPool.h"
+#include "crimson/net/Config.h"
+#include "include/intarith.h"
+
+namespace ceph::thread {
+
+ThreadPool::ThreadPool(size_t n_threads,
+                       size_t queue_sz)
+  : queue_size{round_up_to(queue_sz, seastar::smp::count)},
+    pending{queue_size}
+{
+  for (size_t i = 0; i < n_threads; i++) {
+    threads.emplace_back([this] {
+      loop();
+    });
+  }
+}
+
+ThreadPool::~ThreadPool()
+{
+  for (auto& thread : threads) {
+    thread.join();
+  }
+}
+
+void ThreadPool::loop()
+{
+  for (;;) {
+    WorkItem* work_item = nullptr;
+    {
+      std::unique_lock lock{mutex};
+      cond.wait_for(lock,
+                    ceph::net::conf.threadpool_empty_queue_max_wait,
+                    [this, &work_item] {
+        return pending.pop(work_item) || is_stopping();
+      });
+    }
+    if (work_item) {
+      work_item->process();
+    } else if (is_stopping()) {
+      break;
+    }
+  }
+}
+
+seastar::future<> ThreadPool::start()
+{
+  auto slots_per_shard = queue_size / seastar::smp::count;
+  return submit_queue.start(slots_per_shard);
+}
+
+seastar::future<> ThreadPool::stop()
+{
+  return submit_queue.stop().then([this] {
+    stopping = true;
+    cond.notify_all();
+  });
+}
+
+} // namespace ceph::thread
diff --git a/src/crimson/thread/ThreadPool.h b/src/crimson/thread/ThreadPool.h
new file mode 100644 (file)
index 0000000..5441a56
--- /dev/null
@@ -0,0 +1,116 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+// vim: ts=8 sw=2 smarttab
+#pragma once
+
+#include <atomic>
+#include <condition_variable>
+#include <tuple>
+#include <type_traits>
+#include <boost/lockfree/queue.hpp>
+#include <boost/optional.hpp>
+#include <core/future.hh>
+#include <core/gate.hh>
+#include <core/semaphore.hh>
+#include <core/sharded.hh>
+
+#include "Condition.h"
+
+namespace ceph::thread {
+
+struct WorkItem {
+  virtual ~WorkItem() {}
+  virtual void process() = 0;
+};
+
+template<typename Func, typename T = std::invoke_result_t<Func>>
+struct Task final : WorkItem {
+  Func func;
+  seastar::future_state<T> state;
+  ceph::thread::Condition on_done;
+public:
+  explicit Task(Func&& f)
+    : func(std::move(f))
+  {}
+  void process() override {
+    try {
+      state.set(func());
+    } catch (...) {
+      state.set_exception(std::current_exception());
+    }
+    on_done.notify();
+  }
+  seastar::future<T> get_future() {
+    return on_done.wait().then([this] {
+      return seastar::make_ready_future<T>(state.get0(std::move(state).get()));
+    });
+  }
+};
+
+struct SubmitQueue {
+  seastar::semaphore free_slots;
+  seastar::gate pending_tasks;
+  explicit SubmitQueue(size_t num_free_slots)
+    : free_slots(num_free_slots)
+  {}
+  seastar::future<> stop() {
+    return pending_tasks.close();
+  }
+};
+
+/// an engine for scheduling non-seastar tasks from seastar fibers
+class ThreadPool {
+  std::atomic<bool> stopping = false;
+  std::mutex mutex;
+  std::condition_variable cond;
+  std::vector<std::thread> threads;
+  seastar::sharded<SubmitQueue> submit_queue;
+  const size_t queue_size;
+  boost::lockfree::queue<WorkItem*> pending;
+
+  void loop();
+  bool is_stopping() const {
+    return stopping.load(std::memory_order_relaxed);
+  }
+  seastar::semaphore& local_free_slots() {
+    return submit_queue.local().free_slots;
+  }
+  ThreadPool(const ThreadPool&) = delete;
+  ThreadPool& operator=(const ThreadPool&) = delete;
+public:
+  /**
+   * @param queue_sz the depth of pending queue. before a task is scheduled,
+   *                 it waits in this queue. we will round this number to
+   *                 multiple of the number of cores.
+   * @param n_threads the number of threads in this thread pool.
+   * @note, each @c Task has its own ceph::thread::Condition, which possesses
+   * possesses an fd, so we should keep the size of queue under a resonable
+   * limit.
+   */
+  ThreadPool(size_t n_threads, size_t queue_sz);
+  ~ThreadPool();
+  seastar::future<> start();
+  seastar::future<> stop();
+  template<typename Func, typename...Args>
+  auto submit(Func&& func, Args&&... args) {
+    auto packaged = [func=std::move(func),
+                     args=std::forward_as_tuple(args...)] {
+      return std::apply(std::move(func), std::move(args));
+    };
+    return seastar::with_gate(submit_queue.local().pending_tasks,
+      [packaged=std::move(packaged), this] {
+        return local_free_slots().wait()
+          .then([packaged=std::move(packaged), this] {
+            auto task = new Task{std::move(packaged)};
+            auto fut = task->get_future();
+            pending.push(task);
+            cond.notify_one();
+            return fut.finally([task, this] {
+              local_free_slots().signal();
+              delete task;
+            });
+          });
+        });
+  }
+};
+
+} // namespace ceph::thread
index 61bd791b869edc1d7d8003977e57b9e01a75d3ea..313afa943fff5788a729e511fe88640d4eacbc44 100644 (file)
@@ -29,3 +29,9 @@ set(test_alien_echo_srcs
 add_executable(unittest_seastar_echo ${test_alien_echo_srcs})
 add_ceph_unittest(unittest_seastar_echo)
 target_link_libraries(unittest_seastar_echo ceph-common global Seastar::seastar)
+
+add_executable(unittest_seastar_thread_pool
+  test_thread_pool.cc
+  $<TARGET_OBJECTS:crimson_thread_objs>)
+add_ceph_unittest(unittest_seastar_thread_pool)
+target_link_libraries(unittest_seastar_thread_pool Seastar::seastar)
diff --git a/src/test/crimson/test_thread_pool.cc b/src/test/crimson/test_thread_pool.cc
new file mode 100644 (file)
index 0000000..56e59e4
--- /dev/null
@@ -0,0 +1,49 @@
+#include <chrono>
+#include <numeric>
+#include <core/app-template.hh>
+#include "crimson/thread/ThreadPool.h"
+
+using namespace std::chrono_literals;
+using ThreadPool = ceph::thread::ThreadPool;
+
+seastar::future<> test_accumulate(ThreadPool& tp) {
+  static constexpr auto N = 5;
+  static constexpr auto M = 1;
+  auto slow_plus = [&tp](int i) {
+    return tp.submit([=] {
+      std::this_thread::sleep_for(10ns);
+      return i + M;
+    });
+  };
+  return seastar::map_reduce(
+    boost::irange(0, N), slow_plus, 0, std::plus{}).then([] (int sum) {
+    auto r = boost::irange(0 + M, N + M);
+    if (sum != std::accumulate(r.begin(), r.end(), 0)) {
+      throw std::runtime_error("test_accumulate failed");
+    }
+  });
+}
+
+int main(int argc, char** argv)
+{
+  ThreadPool tp{2, 128};
+  seastar::app_template app;
+  return app.run(argc, argv, [&tp] {
+      return tp.start().then([&tp] {
+          return test_accumulate(tp);
+        }).handle_exception([](auto e) {
+          std::cerr << "Error: " << e << std::endl;
+          seastar::engine().exit(1);
+        }).finally([&tp] {
+          return tp.stop();
+        });
+      });
+}
+
+/*
+ * Local Variables:
+ * compile-command: "make -j4 \
+ * -C ../../../build \
+ * unittest_seastar_thread_pool"
+ * End:
+ */