net/ProtocolV2.cc
net/chained_dispatchers.cc)
set(crimson_thread_srcs
- thread/ThreadPool.cc
thread/Throttle.cc)
add_library(crimson STATIC
${crimson_auth_srcs}
include_directories(SYSTEM "${CMAKE_SOURCE_DIR}/src/rocksdb/include")
set(crimson_alien_srcs
- alien_store.cc)
+ alien_store.cc
+ thread_pool.cc)
list(APPEND crimson_alien_srcs
${PROJECT_SOURCE_DIR}/src/common/admin_socket.cc
g_ceph_context = cct.get();
cct->_conf.set_config_values(values);
store = std::make_unique<BlueStore>(cct.get(), path);
- tp = std::make_unique<crimson::thread::ThreadPool>(1, 128, seastar::this_shard_id() + 10);
+ tp = std::make_unique<crimson::os::ThreadPool>(1, 128, seastar::this_shard_id() + 10);
}
seastar::future<> AlienStore::start()
#include "os/ObjectStore.h"
#include "osd/osd_types.h"
+#include "crimson/os/alienstore/thread_pool.h"
#include "crimson/os/futurized_collection.h"
#include "crimson/os/futurized_store.h"
-#include "crimson/thread/ThreadPool.h"
namespace ceph::os {
class Transaction;
ObjectMap::ObjectMapIterator iter;
AlienStore* store;
};
- mutable std::unique_ptr<crimson::thread::ThreadPool> tp;
+ mutable std::unique_ptr<crimson::os::ThreadPool> tp;
AlienStore(const std::string& path, const ConfigValues& values);
~AlienStore() final;
--- /dev/null
+#include "thread_pool.h"
+
+#include <chrono>
+#include <pthread.h>
+#include "include/intarith.h"
+
+#include "include/ceph_assert.h"
+#include "crimson/common/config_proxy.h"
+
+using crimson::common::local_conf;
+
+namespace crimson::os {
+
+ThreadPool::ThreadPool(size_t n_threads,
+ size_t queue_sz,
+ unsigned cpu_id)
+ : queue_size{round_up_to(queue_sz, seastar::smp::count)},
+ pending{queue_size}
+{
+ auto queue_max_wait = std::chrono::seconds(local_conf()->threadpool_empty_queue_max_wait);
+ for (size_t i = 0; i < n_threads; i++) {
+ threads.emplace_back([this, cpu_id, queue_max_wait] {
+ pin(cpu_id);
+ loop(queue_max_wait);
+ });
+ }
+}
+
+ThreadPool::~ThreadPool()
+{
+ for (auto& thread : threads) {
+ thread.join();
+ }
+}
+
+void ThreadPool::pin(unsigned cpu_id)
+{
+ cpu_set_t cs;
+ CPU_ZERO(&cs);
+ CPU_SET(cpu_id, &cs);
+ [[maybe_unused]] auto r = pthread_setaffinity_np(pthread_self(),
+ sizeof(cs), &cs);
+ ceph_assert(r == 0);
+}
+
+void ThreadPool::loop(std::chrono::milliseconds queue_max_wait)
+{
+ for (;;) {
+ WorkItem* work_item = nullptr;
+ {
+ std::unique_lock lock{mutex};
+ cond.wait_for(lock, queue_max_wait,
+ [this, &work_item] {
+ return pending.pop(work_item) || is_stopping();
+ });
+ }
+ if (work_item) {
+ work_item->process();
+ } else if (is_stopping()) {
+ break;
+ }
+ }
+}
+
+seastar::future<> ThreadPool::start()
+{
+ auto slots_per_shard = queue_size / seastar::smp::count;
+ return submit_queue.start(slots_per_shard);
+}
+
+seastar::future<> ThreadPool::stop()
+{
+ return submit_queue.stop().then([this] {
+ stopping = true;
+ cond.notify_all();
+ });
+}
+
+} // namespace crimson::os
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+#pragma once
+
+#include <atomic>
+#include <condition_variable>
+#include <tuple>
+#include <type_traits>
+#include <boost/lockfree/queue.hpp>
+#include <boost/optional.hpp>
+#include <seastar/core/future.hh>
+#include <seastar/core/gate.hh>
+#include <seastar/core/reactor.hh>
+#include <seastar/core/semaphore.hh>
+#include <seastar/core/sharded.hh>
+
+namespace crimson::os {
+
+struct WorkItem {
+ virtual ~WorkItem() {}
+ virtual void process() = 0;
+};
+
+template<typename Func>
+struct Task final : WorkItem {
+ using T = std::invoke_result_t<Func>;
+ using future_state_t = std::conditional_t<std::is_void_v<T>,
+ seastar::future_state<>,
+ seastar::future_state<T>>;
+ using futurator_t = seastar::futurize<T>;
+public:
+ explicit Task(Func&& f)
+ : func(std::move(f))
+ {}
+ void process() override {
+ try {
+ if constexpr (std::is_void_v<T>) {
+ func();
+ state.set();
+ } else {
+ state.set(func());
+ }
+ } catch (...) {
+ state.set_exception(std::current_exception());
+ }
+ on_done.write_side().signal(1);
+ }
+ typename futurator_t::type get_future() {
+ return on_done.wait().then([this](size_t) {
+ if (state.failed()) {
+ return futurator_t::make_exception_future(state.get_exception());
+ } else {
+ return futurator_t::from_tuple(state.get_value());
+ }
+ });
+ }
+private:
+ Func func;
+ future_state_t state;
+ seastar::readable_eventfd on_done;
+};
+
+struct SubmitQueue {
+ seastar::semaphore free_slots;
+ seastar::gate pending_tasks;
+ explicit SubmitQueue(size_t num_free_slots)
+ : free_slots(num_free_slots)
+ {}
+ seastar::future<> stop() {
+ return pending_tasks.close();
+ }
+};
+
+/// an engine for scheduling non-seastar tasks from seastar fibers
+class ThreadPool {
+ std::atomic<bool> stopping = false;
+ std::mutex mutex;
+ std::condition_variable cond;
+ std::vector<std::thread> threads;
+ seastar::sharded<SubmitQueue> submit_queue;
+ const size_t queue_size;
+ boost::lockfree::queue<WorkItem*> pending;
+
+ void loop(std::chrono::milliseconds queue_max_wait);
+ bool is_stopping() const {
+ return stopping.load(std::memory_order_relaxed);
+ }
+ static void pin(unsigned cpu_id);
+ seastar::semaphore& local_free_slots() {
+ return submit_queue.local().free_slots;
+ }
+ ThreadPool(const ThreadPool&) = delete;
+ ThreadPool& operator=(const ThreadPool&) = delete;
+public:
+ /**
+ * @param queue_sz the depth of pending queue. before a task is scheduled,
+ * it waits in this queue. we will round this number to
+ * multiple of the number of cores.
+ * @param n_threads the number of threads in this thread pool.
+ * @param cpu the CPU core to which this thread pool is assigned
+ * @note each @c Task has its own crimson::thread::Condition, which possesses
+ * an fd, so we should keep the size of queue under a reasonable limit.
+ */
+ ThreadPool(size_t n_threads, size_t queue_sz, unsigned cpu);
+ ~ThreadPool();
+ seastar::future<> start();
+ seastar::future<> stop();
+ template<typename Func, typename...Args>
+ auto submit(Func&& func, Args&&... args) {
+ auto packaged = [func=std::move(func),
+ args=std::forward_as_tuple(args...)] {
+ return std::apply(std::move(func), std::move(args));
+ };
+ return seastar::with_gate(submit_queue.local().pending_tasks,
+ [packaged=std::move(packaged), this] {
+ return local_free_slots().wait()
+ .then([packaged=std::move(packaged), this] {
+ auto task = new Task{std::move(packaged)};
+ auto fut = task->get_future();
+ pending.push(task);
+ cond.notify_one();
+ return fut.finally([task, this] {
+ local_free_slots().signal();
+ delete task;
+ });
+ });
+ });
+ }
+};
+
+} // namespace crimson::os
+++ /dev/null
-#include "ThreadPool.h"
-
-#include <chrono>
-#include <pthread.h>
-#include "include/intarith.h"
-
-#include "include/ceph_assert.h"
-#include "crimson/common/config_proxy.h"
-
-using crimson::common::local_conf;
-
-namespace crimson::thread {
-
-ThreadPool::ThreadPool(size_t n_threads,
- size_t queue_sz,
- unsigned cpu_id)
- : queue_size{round_up_to(queue_sz, seastar::smp::count)},
- pending{queue_size}
-{
- auto queue_max_wait = std::chrono::seconds(local_conf()->threadpool_empty_queue_max_wait);
- for (size_t i = 0; i < n_threads; i++) {
- threads.emplace_back([this, cpu_id, queue_max_wait] {
- pin(cpu_id);
- loop(queue_max_wait);
- });
- }
-}
-
-ThreadPool::~ThreadPool()
-{
- for (auto& thread : threads) {
- thread.join();
- }
-}
-
-void ThreadPool::pin(unsigned cpu_id)
-{
- cpu_set_t cs;
- CPU_ZERO(&cs);
- CPU_SET(cpu_id, &cs);
- [[maybe_unused]] auto r = pthread_setaffinity_np(pthread_self(),
- sizeof(cs), &cs);
- ceph_assert(r == 0);
-}
-
-void ThreadPool::loop(std::chrono::milliseconds queue_max_wait)
-{
- for (;;) {
- WorkItem* work_item = nullptr;
- {
- std::unique_lock lock{mutex};
- cond.wait_for(lock, queue_max_wait,
- [this, &work_item] {
- return pending.pop(work_item) || is_stopping();
- });
- }
- if (work_item) {
- work_item->process();
- } else if (is_stopping()) {
- break;
- }
- }
-}
-
-seastar::future<> ThreadPool::start()
-{
- auto slots_per_shard = queue_size / seastar::smp::count;
- return submit_queue.start(slots_per_shard);
-}
-
-seastar::future<> ThreadPool::stop()
-{
- return submit_queue.stop().then([this] {
- stopping = true;
- cond.notify_all();
- });
-}
-
-} // namespace crimson::thread
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-#pragma once
-
-#include <atomic>
-#include <condition_variable>
-#include <tuple>
-#include <type_traits>
-#include <boost/lockfree/queue.hpp>
-#include <boost/optional.hpp>
-#include <seastar/core/future.hh>
-#include <seastar/core/gate.hh>
-#include <seastar/core/reactor.hh>
-#include <seastar/core/semaphore.hh>
-#include <seastar/core/sharded.hh>
-
-namespace crimson::thread {
-
-struct WorkItem {
- virtual ~WorkItem() {}
- virtual void process() = 0;
-};
-
-template<typename Func>
-struct Task final : WorkItem {
- using T = std::invoke_result_t<Func>;
- using future_state_t = std::conditional_t<std::is_void_v<T>,
- seastar::future_state<>,
- seastar::future_state<T>>;
- using futurator_t = seastar::futurize<T>;
-public:
- explicit Task(Func&& f)
- : func(std::move(f))
- {}
- void process() override {
- try {
- if constexpr (std::is_void_v<T>) {
- func();
- state.set();
- } else {
- state.set(func());
- }
- } catch (...) {
- state.set_exception(std::current_exception());
- }
- on_done.write_side().signal(1);
- }
- typename futurator_t::type get_future() {
- return on_done.wait().then([this](size_t) {
- if (state.failed()) {
- return futurator_t::make_exception_future(state.get_exception());
- } else {
- return futurator_t::from_tuple(state.get_value());
- }
- });
- }
-private:
- Func func;
- future_state_t state;
- seastar::readable_eventfd on_done;
-};
-
-struct SubmitQueue {
- seastar::semaphore free_slots;
- seastar::gate pending_tasks;
- explicit SubmitQueue(size_t num_free_slots)
- : free_slots(num_free_slots)
- {}
- seastar::future<> stop() {
- return pending_tasks.close();
- }
-};
-
-/// an engine for scheduling non-seastar tasks from seastar fibers
-class ThreadPool {
- std::atomic<bool> stopping = false;
- std::mutex mutex;
- std::condition_variable cond;
- std::vector<std::thread> threads;
- seastar::sharded<SubmitQueue> submit_queue;
- const size_t queue_size;
- boost::lockfree::queue<WorkItem*> pending;
-
- void loop(std::chrono::milliseconds queue_max_wait);
- bool is_stopping() const {
- return stopping.load(std::memory_order_relaxed);
- }
- static void pin(unsigned cpu_id);
- seastar::semaphore& local_free_slots() {
- return submit_queue.local().free_slots;
- }
- ThreadPool(const ThreadPool&) = delete;
- ThreadPool& operator=(const ThreadPool&) = delete;
-public:
- /**
- * @param queue_sz the depth of pending queue. before a task is scheduled,
- * it waits in this queue. we will round this number to
- * multiple of the number of cores.
- * @param n_threads the number of threads in this thread pool.
- * @param cpu the CPU core to which this thread pool is assigned
- * @note each @c Task has its own crimson::thread::Condition, which possesses
- * an fd, so we should keep the size of queue under a reasonable limit.
- */
- ThreadPool(size_t n_threads, size_t queue_sz, unsigned cpu);
- ~ThreadPool();
- seastar::future<> start();
- seastar::future<> stop();
- template<typename Func, typename...Args>
- auto submit(Func&& func, Args&&... args) {
- auto packaged = [func=std::move(func),
- args=std::forward_as_tuple(args...)] {
- return std::apply(std::move(func), std::move(args));
- };
- return seastar::with_gate(submit_queue.local().pending_tasks,
- [packaged=std::move(packaged), this] {
- return local_free_slots().wait()
- .then([packaged=std::move(packaged), this] {
- auto task = new Task{std::move(packaged)};
- auto fut = task->get_future();
- pending.push(task);
- cond.notify_one();
- return fut.finally([task, this] {
- local_free_slots().signal();
- delete task;
- });
- });
- });
- }
-};
-
-} // namespace crimson::thread
test_async_echo.cc)
target_link_libraries(unittest_async_echo ceph-common global)
-add_executable(unittest_seastar_thread_pool
- test_thread_pool.cc)
-add_ceph_test(unittest_seastar_thread_pool
- unittest_seastar_thread_pool --memory 256M --smp 1)
-target_link_libraries(unittest_seastar_thread_pool crimson)
+add_executable(unittest_seastar_alienstore_thread_pool
+ test_alienstore_thread_pool.cc)
+add_ceph_test(unittest_seastar_alienstore_thread_pool
+ unittest_seastar_alienstore_thread_pool --memory 256M --smp 1)
+target_link_libraries(unittest_seastar_alienstore_thread_pool
+ crimson
+ crimson-os
+ crimson-common)
add_executable(unittest_seastar_config
test_config.cc)
--- /dev/null
+#include <chrono>
+#include <iostream>
+#include <numeric>
+#include <seastar/core/app-template.hh>
+#include "common/ceph_argparse.h"
+#include "crimson/common/config_proxy.h"
+#include "crimson/os/alienstore/thread_pool.h"
+#include "include/msgr.h"
+
+using namespace std::chrono_literals;
+using ThreadPool = crimson::os::ThreadPool;
+using crimson::common::local_conf;
+
+seastar::future<> test_accumulate(ThreadPool& tp) {
+ static constexpr auto N = 5;
+ static constexpr auto M = 1;
+ auto slow_plus = [&tp](int i) {
+ return tp.submit([=] {
+ std::this_thread::sleep_for(10ns);
+ return i + M;
+ });
+ };
+ return seastar::map_reduce(
+ boost::irange(0, N), slow_plus, 0, std::plus{}).then([] (int sum) {
+ auto r = boost::irange(0 + M, N + M);
+ if (sum != std::accumulate(r.begin(), r.end(), 0)) {
+ throw std::runtime_error("test_accumulate failed");
+ }
+ });
+}
+
+seastar::future<> test_void_return(ThreadPool& tp) {
+ return tp.submit([=] {
+ std::this_thread::sleep_for(10ns);
+ });
+}
+
+int main(int argc, char** argv)
+{
+ seastar::app_template app;
+ return app.run(argc, argv, [] {
+ std::vector<const char*> args;
+ std::string cluster;
+ std::string conf_file_list;
+ auto init_params = ceph_argparse_early_args(args,
+ CEPH_ENTITY_TYPE_CLIENT,
+ &cluster,
+ &conf_file_list);
+ return crimson::common::sharded_conf().start(init_params.name, cluster)
+ .then([conf_file_list] {
+ return local_conf().parse_config_files(conf_file_list);
+ }).then([] {
+ return seastar::do_with(std::make_unique<crimson::os::ThreadPool>(2, 128, 0),
+ [](auto& tp) {
+ return tp->start().then([&tp] {
+ return test_accumulate(*tp);
+ }).then([&tp] {
+ return test_void_return(*tp);
+ }).finally([&tp] {
+ return tp->stop();
+ });
+ });
+ }).finally([] {
+ return crimson::common::sharded_conf().stop();
+ }).handle_exception([](auto e) {
+ std::cerr << "Error: " << e << std::endl;
+ seastar::engine().exit(1);
+ });
+ });
+}
+
+/*
+ * Local Variables:
+ * compile-command: "make -j4 \
+ * -C ../../../build \
+ * unittest_seastar_thread_pool"
+ * End:
+ */
+++ /dev/null
-#include <chrono>
-#include <iostream>
-#include <numeric>
-#include <seastar/core/app-template.hh>
-#include "common/ceph_argparse.h"
-#include "crimson/common/config_proxy.h"
-#include "crimson/thread/ThreadPool.h"
-#include "include/msgr.h"
-
-using namespace std::chrono_literals;
-using ThreadPool = crimson::thread::ThreadPool;
-using crimson::common::local_conf;
-
-seastar::future<> test_accumulate(ThreadPool& tp) {
- static constexpr auto N = 5;
- static constexpr auto M = 1;
- auto slow_plus = [&tp](int i) {
- return tp.submit([=] {
- std::this_thread::sleep_for(10ns);
- return i + M;
- });
- };
- return seastar::map_reduce(
- boost::irange(0, N), slow_plus, 0, std::plus{}).then([] (int sum) {
- auto r = boost::irange(0 + M, N + M);
- if (sum != std::accumulate(r.begin(), r.end(), 0)) {
- throw std::runtime_error("test_accumulate failed");
- }
- });
-}
-
-seastar::future<> test_void_return(ThreadPool& tp) {
- return tp.submit([=] {
- std::this_thread::sleep_for(10ns);
- });
-}
-
-int main(int argc, char** argv)
-{
- seastar::app_template app;
- return app.run(argc, argv, [] {
- std::vector<const char*> args;
- std::string cluster;
- std::string conf_file_list;
- auto init_params = ceph_argparse_early_args(args,
- CEPH_ENTITY_TYPE_CLIENT,
- &cluster,
- &conf_file_list);
- return crimson::common::sharded_conf().start(init_params.name, cluster)
- .then([conf_file_list] {
- return local_conf().parse_config_files(conf_file_list);
- }).then([] {
- return seastar::do_with(std::make_unique<crimson::thread::ThreadPool>(2, 128, 0),
- [](auto& tp) {
- return tp->start().then([&tp] {
- return test_accumulate(*tp);
- }).then([&tp] {
- return test_void_return(*tp);
- }).finally([&tp] {
- return tp->stop();
- });
- });
- }).finally([] {
- return crimson::common::sharded_conf().stop();
- }).handle_exception([](auto e) {
- std::cerr << "Error: " << e << std::endl;
- seastar::engine().exit(1);
- });
- });
-}
-
-/*
- * Local Variables:
- * compile-command: "make -j4 \
- * -C ../../../build \
- * unittest_seastar_thread_pool"
- * End:
- */