From: Kefu Chai Date: Thu, 7 Jun 2018 12:23:11 +0000 (+0800) Subject: crimson/thread: add a thread pool impl X-Git-Tag: v14.0.1~983^2~1 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=28f079e08a0f9e387e6973adfc3c336519d64874;p=ceph-ci.git crimson/thread: add a thread pool impl Signed-off-by: Kefu Chai --- diff --git a/src/crimson/CMakeLists.txt b/src/crimson/CMakeLists.txt index e5162eff419..36a7b102a7e 100644 --- a/src/crimson/CMakeLists.txt +++ b/src/crimson/CMakeLists.txt @@ -1 +1,2 @@ add_subdirectory(net) +add_subdirectory(thread) diff --git a/src/crimson/net/Config.h b/src/crimson/net/Config.h index 21b0a4b5269..90929bdedb2 100644 --- a/src/crimson/net/Config.h +++ b/src/crimson/net/Config.h @@ -20,6 +20,7 @@ constexpr struct simple_md_config_t { bool ms_die_on_skipped_message = true; std::chrono::milliseconds ms_initial_backoff = 200ms; std::chrono::milliseconds ms_max_backoff = 15000ms; + std::chrono::milliseconds threadpool_empty_queue_max_wait = 100ms; size_t osd_client_message_size_cap = 500ULL << 20; } conf; } diff --git a/src/crimson/thread/CMakeLists.txt b/src/crimson/thread/CMakeLists.txt index ff2e5e44fc5..339ff4b1bc4 100644 --- a/src/crimson/thread/CMakeLists.txt +++ b/src/crimson/thread/CMakeLists.txt @@ -1,4 +1,5 @@ set(crimson_thread_srcs + ThreadPool.cc Throttle.cc) add_library(crimson_thread_objs OBJECT ${crimson_thread_srcs}) target_compile_definitions(crimson_thread_objs diff --git a/src/crimson/thread/ThreadPool.cc b/src/crimson/thread/ThreadPool.cc new file mode 100644 index 00000000000..96973b8198d --- /dev/null +++ b/src/crimson/thread/ThreadPool.cc @@ -0,0 +1,60 @@ +#include "ThreadPool.h" +#include "crimson/net/Config.h" +#include "include/intarith.h" + +namespace ceph::thread { + +ThreadPool::ThreadPool(size_t n_threads, + size_t queue_sz) + : queue_size{round_up_to(queue_sz, seastar::smp::count)}, + pending{queue_size} +{ + for (size_t i = 0; i < n_threads; i++) { + threads.emplace_back([this] { + loop(); + }); + } +} + +ThreadPool::~ThreadPool() +{ + for (auto& thread : threads) { + thread.join(); + } +} + +void ThreadPool::loop() +{ + for (;;) { + WorkItem* work_item = nullptr; + { + std::unique_lock lock{mutex}; + cond.wait_for(lock, + ceph::net::conf.threadpool_empty_queue_max_wait, + [this, &work_item] { + return pending.pop(work_item) || is_stopping(); + }); + } + if (work_item) { + work_item->process(); + } else if (is_stopping()) { + break; + } + } +} + +seastar::future<> ThreadPool::start() +{ + auto slots_per_shard = queue_size / seastar::smp::count; + return submit_queue.start(slots_per_shard); +} + +seastar::future<> ThreadPool::stop() +{ + return submit_queue.stop().then([this] { + stopping = true; + cond.notify_all(); + }); +} + +} // namespace ceph::thread diff --git a/src/crimson/thread/ThreadPool.h b/src/crimson/thread/ThreadPool.h new file mode 100644 index 00000000000..5441a569af0 --- /dev/null +++ b/src/crimson/thread/ThreadPool.h @@ -0,0 +1,116 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "Condition.h" + +namespace ceph::thread { + +struct WorkItem { + virtual ~WorkItem() {} + virtual void process() = 0; +}; + +template> +struct Task final : WorkItem { + Func func; + seastar::future_state state; + ceph::thread::Condition on_done; +public: + explicit Task(Func&& f) + : func(std::move(f)) + {} + void process() override { + try { + state.set(func()); + } catch (...) { + state.set_exception(std::current_exception()); + } + on_done.notify(); + } + seastar::future get_future() { + return on_done.wait().then([this] { + return seastar::make_ready_future(state.get0(std::move(state).get())); + }); + } +}; + +struct SubmitQueue { + seastar::semaphore free_slots; + seastar::gate pending_tasks; + explicit SubmitQueue(size_t num_free_slots) + : free_slots(num_free_slots) + {} + seastar::future<> stop() { + return pending_tasks.close(); + } +}; + +/// an engine for scheduling non-seastar tasks from seastar fibers +class ThreadPool { + std::atomic stopping = false; + std::mutex mutex; + std::condition_variable cond; + std::vector threads; + seastar::sharded submit_queue; + const size_t queue_size; + boost::lockfree::queue pending; + + void loop(); + bool is_stopping() const { + return stopping.load(std::memory_order_relaxed); + } + seastar::semaphore& local_free_slots() { + return submit_queue.local().free_slots; + } + ThreadPool(const ThreadPool&) = delete; + ThreadPool& operator=(const ThreadPool&) = delete; +public: + /** + * @param queue_sz the depth of pending queue. before a task is scheduled, + * it waits in this queue. we will round this number to + * multiple of the number of cores. + * @param n_threads the number of threads in this thread pool. + * @note, each @c Task has its own ceph::thread::Condition, which possesses + * possesses an fd, so we should keep the size of queue under a resonable + * limit. + */ + ThreadPool(size_t n_threads, size_t queue_sz); + ~ThreadPool(); + seastar::future<> start(); + seastar::future<> stop(); + template + auto submit(Func&& func, Args&&... args) { + auto packaged = [func=std::move(func), + args=std::forward_as_tuple(args...)] { + return std::apply(std::move(func), std::move(args)); + }; + return seastar::with_gate(submit_queue.local().pending_tasks, + [packaged=std::move(packaged), this] { + return local_free_slots().wait() + .then([packaged=std::move(packaged), this] { + auto task = new Task{std::move(packaged)}; + auto fut = task->get_future(); + pending.push(task); + cond.notify_one(); + return fut.finally([task, this] { + local_free_slots().signal(); + delete task; + }); + }); + }); + } +}; + +} // namespace ceph::thread diff --git a/src/test/crimson/CMakeLists.txt b/src/test/crimson/CMakeLists.txt index 61bd791b869..313afa943ff 100644 --- a/src/test/crimson/CMakeLists.txt +++ b/src/test/crimson/CMakeLists.txt @@ -29,3 +29,9 @@ set(test_alien_echo_srcs add_executable(unittest_seastar_echo ${test_alien_echo_srcs}) add_ceph_unittest(unittest_seastar_echo) target_link_libraries(unittest_seastar_echo ceph-common global Seastar::seastar) + +add_executable(unittest_seastar_thread_pool + test_thread_pool.cc + $) +add_ceph_unittest(unittest_seastar_thread_pool) +target_link_libraries(unittest_seastar_thread_pool Seastar::seastar) diff --git a/src/test/crimson/test_thread_pool.cc b/src/test/crimson/test_thread_pool.cc new file mode 100644 index 00000000000..56e59e4a905 --- /dev/null +++ b/src/test/crimson/test_thread_pool.cc @@ -0,0 +1,49 @@ +#include +#include +#include +#include "crimson/thread/ThreadPool.h" + +using namespace std::chrono_literals; +using ThreadPool = ceph::thread::ThreadPool; + +seastar::future<> test_accumulate(ThreadPool& tp) { + static constexpr auto N = 5; + static constexpr auto M = 1; + auto slow_plus = [&tp](int i) { + return tp.submit([=] { + std::this_thread::sleep_for(10ns); + return i + M; + }); + }; + return seastar::map_reduce( + boost::irange(0, N), slow_plus, 0, std::plus{}).then([] (int sum) { + auto r = boost::irange(0 + M, N + M); + if (sum != std::accumulate(r.begin(), r.end(), 0)) { + throw std::runtime_error("test_accumulate failed"); + } + }); +} + +int main(int argc, char** argv) +{ + ThreadPool tp{2, 128}; + seastar::app_template app; + return app.run(argc, argv, [&tp] { + return tp.start().then([&tp] { + return test_accumulate(tp); + }).handle_exception([](auto e) { + std::cerr << "Error: " << e << std::endl; + seastar::engine().exit(1); + }).finally([&tp] { + return tp.stop(); + }); + }); +} + +/* + * Local Variables: + * compile-command: "make -j4 \ + * -C ../../../build \ + * unittest_seastar_thread_pool" + * End: + */