#include "ThreadPool.h"
+
+#include <pthread.h>
#include "crimson/net/Config.h"
#include "include/intarith.h"
namespace ceph::thread {
ThreadPool::ThreadPool(size_t n_threads,
- size_t queue_sz)
+ size_t queue_sz,
+ unsigned cpu_id)
: queue_size{round_up_to(queue_sz, seastar::smp::count)},
pending{queue_size}
{
for (size_t i = 0; i < n_threads; i++) {
- threads.emplace_back([this] {
+ threads.emplace_back([this, cpu_id] {
+ pin(cpu_id);
loop();
});
}
}
}
+void ThreadPool::pin(unsigned cpu_id)
+{
+ cpu_set_t cs;
+ CPU_ZERO(&cs);
+ CPU_SET(cpu_id, &cs);
+ [[maybe_unused]] auto r = pthread_setaffinity_np(pthread_self(),
+ sizeof(cs), &cs);
+ assert(r == 0);
+}
+
void ThreadPool::loop()
{
for (;;) {
bool is_stopping() const {
return stopping.load(std::memory_order_relaxed);
}
+ static void pin(unsigned cpu_id);
seastar::semaphore& local_free_slots() {
return submit_queue.local().free_slots;
}
* it waits in this queue. we will round this number to
* multiple of the number of cores.
* @param n_threads the number of threads in this thread pool.
- * @note, each @c Task has its own ceph::thread::Condition, which possesses
+ * @param cpu the CPU core to which this thread pool is assigned
+ * @note each @c Task has its own ceph::thread::Condition, which possesses
* possesses an fd, so we should keep the size of queue under a resonable
* limit.
*/
- ThreadPool(size_t n_threads, size_t queue_sz);
+ ThreadPool(size_t n_threads, size_t queue_sz, unsigned cpu);
~ThreadPool();
seastar::future<> start();
seastar::future<> stop();
int main(int argc, char** argv)
{
- ThreadPool tp{2, 128};
+ ThreadPool tp{2, 128, 0};
seastar::app_template app;
return app.run(argc, argv, [&tp] {
return tp.start().then([&tp] {