]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/thread: pin thread pool to given CPU 22776/head
authorKefu Chai <kchai@redhat.com>
Fri, 29 Jun 2018 17:33:44 +0000 (01:33 +0800)
committerKefu Chai <kchai@redhat.com>
Fri, 29 Jun 2018 17:33:55 +0000 (01:33 +0800)
to take the full advantage of seastar reactor, we need to confine the
alien threads to given CPU core(s). in current setting, it's assumed
that alien threads do not perform CPU-bound tasks, so a single core
would suffice. we can always change the interface to pass a cpu_set_t
or a std::bitset if one CPU core is not enough.

Signed-off-by: Kefu Chai <kchai@redhat.com>
src/crimson/thread/ThreadPool.cc
src/crimson/thread/ThreadPool.h
src/test/crimson/test_thread_pool.cc

index 96973b8198dea9d8c96af0367d3c1d093580c26e..d1fb262c477625e41f65fb6143da8ee72ecdc718 100644 (file)
@@ -1,16 +1,20 @@
 #include "ThreadPool.h"
+
+#include <pthread.h>
 #include "crimson/net/Config.h"
 #include "include/intarith.h"
 
 namespace ceph::thread {
 
 ThreadPool::ThreadPool(size_t n_threads,
-                       size_t queue_sz)
+                       size_t queue_sz,
+                       unsigned cpu_id)
   : queue_size{round_up_to(queue_sz, seastar::smp::count)},
     pending{queue_size}
 {
   for (size_t i = 0; i < n_threads; i++) {
-    threads.emplace_back([this] {
+    threads.emplace_back([this, cpu_id] {
+      pin(cpu_id);
       loop();
     });
   }
@@ -23,6 +27,16 @@ ThreadPool::~ThreadPool()
   }
 }
 
+void ThreadPool::pin(unsigned cpu_id)
+{
+  cpu_set_t cs;
+  CPU_ZERO(&cs);
+  CPU_SET(cpu_id, &cs);
+  [[maybe_unused]] auto r = pthread_setaffinity_np(pthread_self(),
+                                                   sizeof(cs), &cs);
+  assert(r == 0);
+}
+
 void ThreadPool::loop()
 {
   for (;;) {
index 5441a569af037798f529a88b23bb86890a557cb7..d83d264378ac65ef487f161657aebdba0ab66b22 100644 (file)
@@ -71,6 +71,7 @@ class ThreadPool {
   bool is_stopping() const {
     return stopping.load(std::memory_order_relaxed);
   }
+  static void pin(unsigned cpu_id);
   seastar::semaphore& local_free_slots() {
     return submit_queue.local().free_slots;
   }
@@ -82,11 +83,12 @@ public:
    *                 it waits in this queue. we will round this number to
    *                 multiple of the number of cores.
    * @param n_threads the number of threads in this thread pool.
-   * @note, each @c Task has its own ceph::thread::Condition, which possesses
+   * @param cpu the CPU core to which this thread pool is assigned
+   * @note each @c Task has its own ceph::thread::Condition, which possesses
    * possesses an fd, so we should keep the size of queue under a resonable
    * limit.
    */
-  ThreadPool(size_t n_threads, size_t queue_sz);
+  ThreadPool(size_t n_threads, size_t queue_sz, unsigned cpu);
   ~ThreadPool();
   seastar::future<> start();
   seastar::future<> stop();
index 56e59e4a905469e967750d277f2ad70ae9b4b404..63922b81f48409cee5152d8511af9974d5207508 100644 (file)
@@ -26,7 +26,7 @@ seastar::future<> test_accumulate(ThreadPool& tp) {
 
 int main(int argc, char** argv)
 {
-  ThreadPool tp{2, 128};
+  ThreadPool tp{2, 128, 0};
   seastar::app_template app;
   return app.run(argc, argv, [&tp] {
       return tp.start().then([&tp] {