/// an engine for scheduling non-seastar tasks from seastar fibers
class ThreadPool {
- size_t n_threads;
- std::atomic<bool> stopping = false;
- std::vector<std::thread> threads;
- seastar::sharded<SubmitQueue> submit_queue;
- const size_t queue_size;
- std::vector<ShardedWorkQueue> pending_queues;
-
- void loop(std::chrono::milliseconds queue_max_wait, size_t shard);
- bool is_stopping() const {
- return stopping.load(std::memory_order_relaxed);
- }
- static void pin(const std::vector<uint64_t>& cpus);
- static void block_sighup();
- seastar::semaphore& local_free_slots() {
- return submit_queue.local().free_slots;
- }
- ThreadPool(const ThreadPool&) = delete;
- ThreadPool& operator=(const ThreadPool&) = delete;
public:
/**
* @param queue_sz the depth of pending queue. before a task is scheduled,
auto submit(Func&& func) {
return submit(::rand() % n_threads, std::forward<Func>(func));
}
+
+private:
+ void loop(std::chrono::milliseconds queue_max_wait, size_t shard);
+ bool is_stopping() const {
+ return stopping.load(std::memory_order_relaxed);
+ }
+ static void pin(const std::vector<uint64_t>& cpus);
+ static void block_sighup();
+ seastar::semaphore& local_free_slots() {
+ return submit_queue.local().free_slots;
+ }
+ ThreadPool(const ThreadPool&) = delete;
+ ThreadPool& operator=(const ThreadPool&) = delete;
+
+private:
+ size_t n_threads;
+ std::atomic<bool> stopping = false;
+ std::vector<std::thread> threads;
+ seastar::sharded<SubmitQueue> submit_queue;
+ const size_t queue_size;
+ std::vector<ShardedWorkQueue> pending_queues;
};
} // namespace crimson::os