From 460ad1f8a1e14c3fc10c31111e7c517bf830b886 Mon Sep 17 00:00:00 2001 From: Kefu Chai Date: Sat, 30 Jun 2018 01:33:44 +0800 Subject: [PATCH] crimson/thread: pin thread pool to given CPU to take the full advantage of seastar reactor, we need to confine the alien threads to given CPU core(s). in current setting, it's assumed that alien threads do not perform CPU-bound tasks, so a single core would suffice. we can always change the interface to pass a cpu_set_t or a std::bitset if one CPU core is not enough. Signed-off-by: Kefu Chai --- src/crimson/thread/ThreadPool.cc | 18 ++++++++++++++++-- src/crimson/thread/ThreadPool.h | 6 ++++-- src/test/crimson/test_thread_pool.cc | 2 +- 3 files changed, 21 insertions(+), 5 deletions(-) diff --git a/src/crimson/thread/ThreadPool.cc b/src/crimson/thread/ThreadPool.cc index 96973b8198dea..d1fb262c47762 100644 --- a/src/crimson/thread/ThreadPool.cc +++ b/src/crimson/thread/ThreadPool.cc @@ -1,16 +1,20 @@ #include "ThreadPool.h" + +#include #include "crimson/net/Config.h" #include "include/intarith.h" namespace ceph::thread { ThreadPool::ThreadPool(size_t n_threads, - size_t queue_sz) + size_t queue_sz, + unsigned cpu_id) : queue_size{round_up_to(queue_sz, seastar::smp::count)}, pending{queue_size} { for (size_t i = 0; i < n_threads; i++) { - threads.emplace_back([this] { + threads.emplace_back([this, cpu_id] { + pin(cpu_id); loop(); }); } @@ -23,6 +27,16 @@ ThreadPool::~ThreadPool() } } +void ThreadPool::pin(unsigned cpu_id) +{ + cpu_set_t cs; + CPU_ZERO(&cs); + CPU_SET(cpu_id, &cs); + [[maybe_unused]] auto r = pthread_setaffinity_np(pthread_self(), + sizeof(cs), &cs); + assert(r == 0); +} + void ThreadPool::loop() { for (;;) { diff --git a/src/crimson/thread/ThreadPool.h b/src/crimson/thread/ThreadPool.h index 5441a569af037..d83d264378ac6 100644 --- a/src/crimson/thread/ThreadPool.h +++ b/src/crimson/thread/ThreadPool.h @@ -71,6 +71,7 @@ class ThreadPool { bool is_stopping() const { return stopping.load(std::memory_order_relaxed); } + static void pin(unsigned cpu_id); seastar::semaphore& local_free_slots() { return submit_queue.local().free_slots; } @@ -82,11 +83,12 @@ public: * it waits in this queue. we will round this number to * multiple of the number of cores. * @param n_threads the number of threads in this thread pool. - * @note, each @c Task has its own ceph::thread::Condition, which possesses + * @param cpu the CPU core to which this thread pool is assigned + * @note each @c Task has its own ceph::thread::Condition, which possesses * possesses an fd, so we should keep the size of queue under a resonable * limit. */ - ThreadPool(size_t n_threads, size_t queue_sz); + ThreadPool(size_t n_threads, size_t queue_sz, unsigned cpu); ~ThreadPool(); seastar::future<> start(); seastar::future<> stop(); diff --git a/src/test/crimson/test_thread_pool.cc b/src/test/crimson/test_thread_pool.cc index 56e59e4a90546..63922b81f4840 100644 --- a/src/test/crimson/test_thread_pool.cc +++ b/src/test/crimson/test_thread_pool.cc @@ -26,7 +26,7 @@ seastar::future<> test_accumulate(ThreadPool& tp) { int main(int argc, char** argv) { - ThreadPool tp{2, 128}; + ThreadPool tp{2, 128, 0}; seastar::app_template app; return app.run(argc, argv, [&tp] { return tp.start().then([&tp] { -- 2.39.5