g_ceph_context = cct.get();
cct->_conf.set_config_values(values);
store = std::make_unique<BlueStore>(cct.get(), path);
-
- long cpu_id = 0;
- if (long nr_cpus = sysconf(_SC_NPROCESSORS_ONLN); nr_cpus != -1) {
- cpu_id = nr_cpus - 1;
- } else {
- logger().error("{}: unable to get nproc: {}", __func__, errno);
- cpu_id = -1;
- }
const auto num_threads =
cct->_conf.get_val<uint64_t>("crimson_alien_op_num_threads");
- tp = std::make_unique<crimson::os::ThreadPool>(num_threads, 128, cpu_id);
+ std::vector<uint64_t> cpu_cores = _parse_cpu_cores();
+ tp = std::make_unique<crimson::os::ThreadPool>(num_threads, 128, cpu_cores);
}
seastar::future<> AlienStore::start()
return iter->status();
}
+std::vector<uint64_t> AlienStore::_parse_cpu_cores()
+{
+ std::vector<uint64_t> cpu_cores;
+ auto cpu_string =
+ cct->_conf.get_val<std::string>("crimson_alien_thread_cpu_cores");
+
+ std::string token;
+ std::istringstream token_stream(cpu_string);
+ while (std::getline(token_stream, token, ',')) {
+ std::istringstream cpu_stream(token);
+ std::string cpu;
+ std::getline(cpu_stream, cpu, '-');
+ uint64_t start_cpu = std::stoull(cpu);
+ std::getline(cpu_stream, cpu, '-');
+ uint64_t end_cpu = std::stoull(cpu);
+ for (uint64_t i = start_cpu; i < end_cpu; i++) {
+ cpu_cores.push_back(i);
+ }
+ }
+ return cpu_cores;
+}
+
}
std::unique_ptr<CephContext> cct;
seastar::gate transaction_gate;
std::unordered_map<coll_t, CollectionRef> coll_map;
+ std::vector<uint64_t> _parse_cpu_cores();
};
}
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
+// vim: ts=8 sw=2 smarttab expandtab
+
#include "thread_pool.h"
#include <chrono>
ThreadPool::ThreadPool(size_t n_threads,
size_t queue_sz,
- long cpu_id)
+ std::vector<uint64_t> cpus)
: queue_size{round_up_to(queue_sz, seastar::smp::count)},
pending{queue_size}
{
auto queue_max_wait = std::chrono::seconds(local_conf()->threadpool_empty_queue_max_wait);
for (size_t i = 0; i < n_threads; i++) {
- threads.emplace_back([this, cpu_id, queue_max_wait] {
- if (cpu_id >= 0) {
- pin(cpu_id);
+ threads.emplace_back([this, cpus, queue_max_wait] {
+ if (!cpus.empty()) {
+ pin(cpus);
}
loop(queue_max_wait);
});
}
}
-void ThreadPool::pin(unsigned cpu_id)
+void ThreadPool::pin(const std::vector<uint64_t>& cpus)
{
cpu_set_t cs;
CPU_ZERO(&cs);
- CPU_SET(cpu_id, &cs);
+ for (auto cpu : cpus) {
+ CPU_SET(cpu, &cs);
+ }
[[maybe_unused]] auto r = pthread_setaffinity_np(pthread_self(),
sizeof(cs), &cs);
ceph_assert(r == 0);
bool is_stopping() const {
return stopping.load(std::memory_order_relaxed);
}
- static void pin(unsigned cpu_id);
+ static void pin(const std::vector<uint64_t>& cpus);
seastar::semaphore& local_free_slots() {
return submit_queue.local().free_slots;
}
* @note each @c Task has its own crimson::thread::Condition, which possesses
* an fd, so we should keep the size of queue under a reasonable limit.
*/
- ThreadPool(size_t n_threads, size_t queue_sz, long cpu);
+ ThreadPool(size_t n_threads, size_t queue_sz, std::vector<uint64_t> cpus);
~ThreadPool();
seastar::future<> start();
seastar::future<> stop();