From: Radoslaw Zarzynski Date: Thu, 18 Jun 2020 13:40:43 +0000 (+0200) Subject: crimson: make ThreadPool specific to AlienStore. X-Git-Tag: wip-pdonnell-testing-20200918.022351~950^2~1 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=8b8cb37de49a74ff7116975ff4603fbdf5575f7f;p=ceph-ci.git crimson: make ThreadPool specific to AlienStore. Signed-off-by: Radoslaw Zarzynski --- diff --git a/src/crimson/CMakeLists.txt b/src/crimson/CMakeLists.txt index e30aef360fe..f11718fdeb2 100644 --- a/src/crimson/CMakeLists.txt +++ b/src/crimson/CMakeLists.txt @@ -167,7 +167,6 @@ set(crimson_net_srcs net/ProtocolV2.cc net/chained_dispatchers.cc) set(crimson_thread_srcs - thread/ThreadPool.cc thread/Throttle.cc) add_library(crimson STATIC ${crimson_auth_srcs} diff --git a/src/crimson/os/alienstore/CMakeLists.txt b/src/crimson/os/alienstore/CMakeLists.txt index e3343d52d6f..f616dcc8190 100644 --- a/src/crimson/os/alienstore/CMakeLists.txt +++ b/src/crimson/os/alienstore/CMakeLists.txt @@ -1,6 +1,7 @@ include_directories(SYSTEM "${CMAKE_SOURCE_DIR}/src/rocksdb/include") set(crimson_alien_srcs - alien_store.cc) + alien_store.cc + thread_pool.cc) list(APPEND crimson_alien_srcs ${PROJECT_SOURCE_DIR}/src/common/admin_socket.cc diff --git a/src/crimson/os/alienstore/alien_store.cc b/src/crimson/os/alienstore/alien_store.cc index 93105a64221..27395ae5caf 100644 --- a/src/crimson/os/alienstore/alien_store.cc +++ b/src/crimson/os/alienstore/alien_store.cc @@ -63,7 +63,7 @@ AlienStore::AlienStore(const std::string& path, const ConfigValues& values) g_ceph_context = cct.get(); cct->_conf.set_config_values(values); store = std::make_unique(cct.get(), path); - tp = std::make_unique(1, 128, seastar::this_shard_id() + 10); + tp = std::make_unique(1, 128, seastar::this_shard_id() + 10); } seastar::future<> AlienStore::start() diff --git a/src/crimson/os/alienstore/alien_store.h b/src/crimson/os/alienstore/alien_store.h index 3552274344c..f862d7b8409 100644 --- a/src/crimson/os/alienstore/alien_store.h +++ b/src/crimson/os/alienstore/alien_store.h @@ -10,9 +10,9 @@ #include "os/ObjectStore.h" #include "osd/osd_types.h" +#include "crimson/os/alienstore/thread_pool.h" #include "crimson/os/futurized_collection.h" #include "crimson/os/futurized_store.h" -#include "crimson/thread/ThreadPool.h" namespace ceph::os { class Transaction; @@ -38,7 +38,7 @@ public: ObjectMap::ObjectMapIterator iter; AlienStore* store; }; - mutable std::unique_ptr tp; + mutable std::unique_ptr tp; AlienStore(const std::string& path, const ConfigValues& values); ~AlienStore() final; diff --git a/src/crimson/os/alienstore/thread_pool.cc b/src/crimson/os/alienstore/thread_pool.cc new file mode 100644 index 00000000000..468c733bf21 --- /dev/null +++ b/src/crimson/os/alienstore/thread_pool.cc @@ -0,0 +1,79 @@ +#include "thread_pool.h" + +#include +#include +#include "include/intarith.h" + +#include "include/ceph_assert.h" +#include "crimson/common/config_proxy.h" + +using crimson::common::local_conf; + +namespace crimson::os { + +ThreadPool::ThreadPool(size_t n_threads, + size_t queue_sz, + unsigned cpu_id) + : queue_size{round_up_to(queue_sz, seastar::smp::count)}, + pending{queue_size} +{ + auto queue_max_wait = std::chrono::seconds(local_conf()->threadpool_empty_queue_max_wait); + for (size_t i = 0; i < n_threads; i++) { + threads.emplace_back([this, cpu_id, queue_max_wait] { + pin(cpu_id); + loop(queue_max_wait); + }); + } +} + +ThreadPool::~ThreadPool() +{ + for (auto& thread : threads) { + thread.join(); + } +} + +void ThreadPool::pin(unsigned cpu_id) +{ + cpu_set_t cs; + CPU_ZERO(&cs); + CPU_SET(cpu_id, &cs); + [[maybe_unused]] auto r = pthread_setaffinity_np(pthread_self(), + sizeof(cs), &cs); + ceph_assert(r == 0); +} + +void ThreadPool::loop(std::chrono::milliseconds queue_max_wait) +{ + for (;;) { + WorkItem* work_item = nullptr; + { + std::unique_lock lock{mutex}; + cond.wait_for(lock, queue_max_wait, + [this, &work_item] { + return pending.pop(work_item) || is_stopping(); + }); + } + if (work_item) { + work_item->process(); + } else if (is_stopping()) { + break; + } + } +} + +seastar::future<> ThreadPool::start() +{ + auto slots_per_shard = queue_size / seastar::smp::count; + return submit_queue.start(slots_per_shard); +} + +seastar::future<> ThreadPool::stop() +{ + return submit_queue.stop().then([this] { + stopping = true; + cond.notify_all(); + }); +} + +} // namespace crimson::os diff --git a/src/crimson/os/alienstore/thread_pool.h b/src/crimson/os/alienstore/thread_pool.h new file mode 100644 index 00000000000..ec3e450a578 --- /dev/null +++ b/src/crimson/os/alienstore/thread_pool.h @@ -0,0 +1,131 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace crimson::os { + +struct WorkItem { + virtual ~WorkItem() {} + virtual void process() = 0; +}; + +template +struct Task final : WorkItem { + using T = std::invoke_result_t; + using future_state_t = std::conditional_t, + seastar::future_state<>, + seastar::future_state>; + using futurator_t = seastar::futurize; +public: + explicit Task(Func&& f) + : func(std::move(f)) + {} + void process() override { + try { + if constexpr (std::is_void_v) { + func(); + state.set(); + } else { + state.set(func()); + } + } catch (...) { + state.set_exception(std::current_exception()); + } + on_done.write_side().signal(1); + } + typename futurator_t::type get_future() { + return on_done.wait().then([this](size_t) { + if (state.failed()) { + return futurator_t::make_exception_future(state.get_exception()); + } else { + return futurator_t::from_tuple(state.get_value()); + } + }); + } +private: + Func func; + future_state_t state; + seastar::readable_eventfd on_done; +}; + +struct SubmitQueue { + seastar::semaphore free_slots; + seastar::gate pending_tasks; + explicit SubmitQueue(size_t num_free_slots) + : free_slots(num_free_slots) + {} + seastar::future<> stop() { + return pending_tasks.close(); + } +}; + +/// an engine for scheduling non-seastar tasks from seastar fibers +class ThreadPool { + std::atomic stopping = false; + std::mutex mutex; + std::condition_variable cond; + std::vector threads; + seastar::sharded submit_queue; + const size_t queue_size; + boost::lockfree::queue pending; + + void loop(std::chrono::milliseconds queue_max_wait); + bool is_stopping() const { + return stopping.load(std::memory_order_relaxed); + } + static void pin(unsigned cpu_id); + seastar::semaphore& local_free_slots() { + return submit_queue.local().free_slots; + } + ThreadPool(const ThreadPool&) = delete; + ThreadPool& operator=(const ThreadPool&) = delete; +public: + /** + * @param queue_sz the depth of pending queue. before a task is scheduled, + * it waits in this queue. we will round this number to + * multiple of the number of cores. + * @param n_threads the number of threads in this thread pool. + * @param cpu the CPU core to which this thread pool is assigned + * @note each @c Task has its own crimson::thread::Condition, which possesses + * an fd, so we should keep the size of queue under a reasonable limit. + */ + ThreadPool(size_t n_threads, size_t queue_sz, unsigned cpu); + ~ThreadPool(); + seastar::future<> start(); + seastar::future<> stop(); + template + auto submit(Func&& func, Args&&... args) { + auto packaged = [func=std::move(func), + args=std::forward_as_tuple(args...)] { + return std::apply(std::move(func), std::move(args)); + }; + return seastar::with_gate(submit_queue.local().pending_tasks, + [packaged=std::move(packaged), this] { + return local_free_slots().wait() + .then([packaged=std::move(packaged), this] { + auto task = new Task{std::move(packaged)}; + auto fut = task->get_future(); + pending.push(task); + cond.notify_one(); + return fut.finally([task, this] { + local_free_slots().signal(); + delete task; + }); + }); + }); + } +}; + +} // namespace crimson::os diff --git a/src/crimson/thread/ThreadPool.cc b/src/crimson/thread/ThreadPool.cc deleted file mode 100644 index 9914d8749f0..00000000000 --- a/src/crimson/thread/ThreadPool.cc +++ /dev/null @@ -1,79 +0,0 @@ -#include "ThreadPool.h" - -#include -#include -#include "include/intarith.h" - -#include "include/ceph_assert.h" -#include "crimson/common/config_proxy.h" - -using crimson::common::local_conf; - -namespace crimson::thread { - -ThreadPool::ThreadPool(size_t n_threads, - size_t queue_sz, - unsigned cpu_id) - : queue_size{round_up_to(queue_sz, seastar::smp::count)}, - pending{queue_size} -{ - auto queue_max_wait = std::chrono::seconds(local_conf()->threadpool_empty_queue_max_wait); - for (size_t i = 0; i < n_threads; i++) { - threads.emplace_back([this, cpu_id, queue_max_wait] { - pin(cpu_id); - loop(queue_max_wait); - }); - } -} - -ThreadPool::~ThreadPool() -{ - for (auto& thread : threads) { - thread.join(); - } -} - -void ThreadPool::pin(unsigned cpu_id) -{ - cpu_set_t cs; - CPU_ZERO(&cs); - CPU_SET(cpu_id, &cs); - [[maybe_unused]] auto r = pthread_setaffinity_np(pthread_self(), - sizeof(cs), &cs); - ceph_assert(r == 0); -} - -void ThreadPool::loop(std::chrono::milliseconds queue_max_wait) -{ - for (;;) { - WorkItem* work_item = nullptr; - { - std::unique_lock lock{mutex}; - cond.wait_for(lock, queue_max_wait, - [this, &work_item] { - return pending.pop(work_item) || is_stopping(); - }); - } - if (work_item) { - work_item->process(); - } else if (is_stopping()) { - break; - } - } -} - -seastar::future<> ThreadPool::start() -{ - auto slots_per_shard = queue_size / seastar::smp::count; - return submit_queue.start(slots_per_shard); -} - -seastar::future<> ThreadPool::stop() -{ - return submit_queue.stop().then([this] { - stopping = true; - cond.notify_all(); - }); -} - -} // namespace crimson::thread diff --git a/src/crimson/thread/ThreadPool.h b/src/crimson/thread/ThreadPool.h deleted file mode 100644 index 91c08eab988..00000000000 --- a/src/crimson/thread/ThreadPool.h +++ /dev/null @@ -1,131 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab -#pragma once - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -namespace crimson::thread { - -struct WorkItem { - virtual ~WorkItem() {} - virtual void process() = 0; -}; - -template -struct Task final : WorkItem { - using T = std::invoke_result_t; - using future_state_t = std::conditional_t, - seastar::future_state<>, - seastar::future_state>; - using futurator_t = seastar::futurize; -public: - explicit Task(Func&& f) - : func(std::move(f)) - {} - void process() override { - try { - if constexpr (std::is_void_v) { - func(); - state.set(); - } else { - state.set(func()); - } - } catch (...) { - state.set_exception(std::current_exception()); - } - on_done.write_side().signal(1); - } - typename futurator_t::type get_future() { - return on_done.wait().then([this](size_t) { - if (state.failed()) { - return futurator_t::make_exception_future(state.get_exception()); - } else { - return futurator_t::from_tuple(state.get_value()); - } - }); - } -private: - Func func; - future_state_t state; - seastar::readable_eventfd on_done; -}; - -struct SubmitQueue { - seastar::semaphore free_slots; - seastar::gate pending_tasks; - explicit SubmitQueue(size_t num_free_slots) - : free_slots(num_free_slots) - {} - seastar::future<> stop() { - return pending_tasks.close(); - } -}; - -/// an engine for scheduling non-seastar tasks from seastar fibers -class ThreadPool { - std::atomic stopping = false; - std::mutex mutex; - std::condition_variable cond; - std::vector threads; - seastar::sharded submit_queue; - const size_t queue_size; - boost::lockfree::queue pending; - - void loop(std::chrono::milliseconds queue_max_wait); - bool is_stopping() const { - return stopping.load(std::memory_order_relaxed); - } - static void pin(unsigned cpu_id); - seastar::semaphore& local_free_slots() { - return submit_queue.local().free_slots; - } - ThreadPool(const ThreadPool&) = delete; - ThreadPool& operator=(const ThreadPool&) = delete; -public: - /** - * @param queue_sz the depth of pending queue. before a task is scheduled, - * it waits in this queue. we will round this number to - * multiple of the number of cores. - * @param n_threads the number of threads in this thread pool. - * @param cpu the CPU core to which this thread pool is assigned - * @note each @c Task has its own crimson::thread::Condition, which possesses - * an fd, so we should keep the size of queue under a reasonable limit. - */ - ThreadPool(size_t n_threads, size_t queue_sz, unsigned cpu); - ~ThreadPool(); - seastar::future<> start(); - seastar::future<> stop(); - template - auto submit(Func&& func, Args&&... args) { - auto packaged = [func=std::move(func), - args=std::forward_as_tuple(args...)] { - return std::apply(std::move(func), std::move(args)); - }; - return seastar::with_gate(submit_queue.local().pending_tasks, - [packaged=std::move(packaged), this] { - return local_free_slots().wait() - .then([packaged=std::move(packaged), this] { - auto task = new Task{std::move(packaged)}; - auto fut = task->get_future(); - pending.push(task); - cond.notify_one(); - return fut.finally([task, this] { - local_free_slots().signal(); - delete task; - }); - }); - }); - } -}; - -} // namespace crimson::thread diff --git a/src/test/crimson/CMakeLists.txt b/src/test/crimson/CMakeLists.txt index 6516a138797..eb1d0fd91c0 100644 --- a/src/test/crimson/CMakeLists.txt +++ b/src/test/crimson/CMakeLists.txt @@ -30,11 +30,14 @@ add_executable(unittest_async_echo test_async_echo.cc) target_link_libraries(unittest_async_echo ceph-common global) -add_executable(unittest_seastar_thread_pool - test_thread_pool.cc) -add_ceph_test(unittest_seastar_thread_pool - unittest_seastar_thread_pool --memory 256M --smp 1) -target_link_libraries(unittest_seastar_thread_pool crimson) +add_executable(unittest_seastar_alienstore_thread_pool + test_alienstore_thread_pool.cc) +add_ceph_test(unittest_seastar_alienstore_thread_pool + unittest_seastar_alienstore_thread_pool --memory 256M --smp 1) +target_link_libraries(unittest_seastar_alienstore_thread_pool + crimson + crimson-os + crimson-common) add_executable(unittest_seastar_config test_config.cc) diff --git a/src/test/crimson/test_alienstore_thread_pool.cc b/src/test/crimson/test_alienstore_thread_pool.cc new file mode 100644 index 00000000000..82b98abbb60 --- /dev/null +++ b/src/test/crimson/test_alienstore_thread_pool.cc @@ -0,0 +1,78 @@ +#include +#include +#include +#include +#include "common/ceph_argparse.h" +#include "crimson/common/config_proxy.h" +#include "crimson/os/alienstore/thread_pool.h" +#include "include/msgr.h" + +using namespace std::chrono_literals; +using ThreadPool = crimson::os::ThreadPool; +using crimson::common::local_conf; + +seastar::future<> test_accumulate(ThreadPool& tp) { + static constexpr auto N = 5; + static constexpr auto M = 1; + auto slow_plus = [&tp](int i) { + return tp.submit([=] { + std::this_thread::sleep_for(10ns); + return i + M; + }); + }; + return seastar::map_reduce( + boost::irange(0, N), slow_plus, 0, std::plus{}).then([] (int sum) { + auto r = boost::irange(0 + M, N + M); + if (sum != std::accumulate(r.begin(), r.end(), 0)) { + throw std::runtime_error("test_accumulate failed"); + } + }); +} + +seastar::future<> test_void_return(ThreadPool& tp) { + return tp.submit([=] { + std::this_thread::sleep_for(10ns); + }); +} + +int main(int argc, char** argv) +{ + seastar::app_template app; + return app.run(argc, argv, [] { + std::vector args; + std::string cluster; + std::string conf_file_list; + auto init_params = ceph_argparse_early_args(args, + CEPH_ENTITY_TYPE_CLIENT, + &cluster, + &conf_file_list); + return crimson::common::sharded_conf().start(init_params.name, cluster) + .then([conf_file_list] { + return local_conf().parse_config_files(conf_file_list); + }).then([] { + return seastar::do_with(std::make_unique(2, 128, 0), + [](auto& tp) { + return tp->start().then([&tp] { + return test_accumulate(*tp); + }).then([&tp] { + return test_void_return(*tp); + }).finally([&tp] { + return tp->stop(); + }); + }); + }).finally([] { + return crimson::common::sharded_conf().stop(); + }).handle_exception([](auto e) { + std::cerr << "Error: " << e << std::endl; + seastar::engine().exit(1); + }); + }); +} + +/* + * Local Variables: + * compile-command: "make -j4 \ + * -C ../../../build \ + * unittest_seastar_thread_pool" + * End: + */ diff --git a/src/test/crimson/test_thread_pool.cc b/src/test/crimson/test_thread_pool.cc deleted file mode 100644 index 962470e3b69..00000000000 --- a/src/test/crimson/test_thread_pool.cc +++ /dev/null @@ -1,78 +0,0 @@ -#include -#include -#include -#include -#include "common/ceph_argparse.h" -#include "crimson/common/config_proxy.h" -#include "crimson/thread/ThreadPool.h" -#include "include/msgr.h" - -using namespace std::chrono_literals; -using ThreadPool = crimson::thread::ThreadPool; -using crimson::common::local_conf; - -seastar::future<> test_accumulate(ThreadPool& tp) { - static constexpr auto N = 5; - static constexpr auto M = 1; - auto slow_plus = [&tp](int i) { - return tp.submit([=] { - std::this_thread::sleep_for(10ns); - return i + M; - }); - }; - return seastar::map_reduce( - boost::irange(0, N), slow_plus, 0, std::plus{}).then([] (int sum) { - auto r = boost::irange(0 + M, N + M); - if (sum != std::accumulate(r.begin(), r.end(), 0)) { - throw std::runtime_error("test_accumulate failed"); - } - }); -} - -seastar::future<> test_void_return(ThreadPool& tp) { - return tp.submit([=] { - std::this_thread::sleep_for(10ns); - }); -} - -int main(int argc, char** argv) -{ - seastar::app_template app; - return app.run(argc, argv, [] { - std::vector args; - std::string cluster; - std::string conf_file_list; - auto init_params = ceph_argparse_early_args(args, - CEPH_ENTITY_TYPE_CLIENT, - &cluster, - &conf_file_list); - return crimson::common::sharded_conf().start(init_params.name, cluster) - .then([conf_file_list] { - return local_conf().parse_config_files(conf_file_list); - }).then([] { - return seastar::do_with(std::make_unique(2, 128, 0), - [](auto& tp) { - return tp->start().then([&tp] { - return test_accumulate(*tp); - }).then([&tp] { - return test_void_return(*tp); - }).finally([&tp] { - return tp->stop(); - }); - }); - }).finally([] { - return crimson::common::sharded_conf().stop(); - }).handle_exception([](auto e) { - std::cerr << "Error: " << e << std::endl; - seastar::engine().exit(1); - }); - }); -} - -/* - * Local Variables: - * compile-command: "make -j4 \ - * -C ../../../build \ - * unittest_seastar_thread_pool" - * End: - */