]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
crimson: make ThreadPool specific to AlienStore.
authorRadoslaw Zarzynski <rzarzyns@redhat.com>
Thu, 18 Jun 2020 13:40:43 +0000 (15:40 +0200)
committerRadoslaw Zarzynski <rzarzyns@redhat.com>
Thu, 18 Jun 2020 14:15:45 +0000 (16:15 +0200)
Signed-off-by: Radoslaw Zarzynski <rzarzyns@redhat.com>
src/crimson/CMakeLists.txt
src/crimson/os/alienstore/CMakeLists.txt
src/crimson/os/alienstore/alien_store.cc
src/crimson/os/alienstore/alien_store.h
src/crimson/os/alienstore/thread_pool.cc [new file with mode: 0644]
src/crimson/os/alienstore/thread_pool.h [new file with mode: 0644]
src/crimson/thread/ThreadPool.cc [deleted file]
src/crimson/thread/ThreadPool.h [deleted file]
src/test/crimson/CMakeLists.txt
src/test/crimson/test_alienstore_thread_pool.cc [new file with mode: 0644]
src/test/crimson/test_thread_pool.cc [deleted file]

index e30aef360fecfb16c2798f688a19a8ad8f959269..f11718fdeb2b76ee3a50d3add071ecdba682ffd6 100644 (file)
@@ -167,7 +167,6 @@ set(crimson_net_srcs
   net/ProtocolV2.cc
   net/chained_dispatchers.cc)
 set(crimson_thread_srcs
-  thread/ThreadPool.cc
   thread/Throttle.cc)
 add_library(crimson STATIC
   ${crimson_auth_srcs}
index e3343d52d6fe9dd83d3a6c44774587dc89c92490..f616dcc819080e04a21d41e96ba6095960847643 100644 (file)
@@ -1,6 +1,7 @@
 include_directories(SYSTEM "${CMAKE_SOURCE_DIR}/src/rocksdb/include")
 set(crimson_alien_srcs
-    alien_store.cc)
+    alien_store.cc
+    thread_pool.cc)
 
 list(APPEND crimson_alien_srcs
   ${PROJECT_SOURCE_DIR}/src/common/admin_socket.cc
index 93105a642217546b7503435a877c3905fd96d93f..27395ae5cafdaa73c46a72e2da6c976c0adbd719 100644 (file)
@@ -63,7 +63,7 @@ AlienStore::AlienStore(const std::string& path, const ConfigValues& values)
   g_ceph_context = cct.get();
   cct->_conf.set_config_values(values);
   store = std::make_unique<BlueStore>(cct.get(), path);
-  tp = std::make_unique<crimson::thread::ThreadPool>(1, 128, seastar::this_shard_id() + 10);
+  tp = std::make_unique<crimson::os::ThreadPool>(1, 128, seastar::this_shard_id() + 10);
 }
 
 seastar::future<> AlienStore::start()
index 3552274344ce31a1ad63a9caeb466c4fd98792c9..f862d7b8409c6cc7c4fa96b42ed8370dd253d3b2 100644 (file)
@@ -10,9 +10,9 @@
 #include "os/ObjectStore.h"
 #include "osd/osd_types.h"
 
+#include "crimson/os/alienstore/thread_pool.h"
 #include "crimson/os/futurized_collection.h"
 #include "crimson/os/futurized_store.h"
-#include "crimson/thread/ThreadPool.h"
 
 namespace ceph::os {
 class Transaction;
@@ -38,7 +38,7 @@ public:
     ObjectMap::ObjectMapIterator iter;
     AlienStore* store;
   };
-  mutable std::unique_ptr<crimson::thread::ThreadPool> tp;
+  mutable std::unique_ptr<crimson::os::ThreadPool> tp;
   AlienStore(const std::string& path, const ConfigValues& values);
   ~AlienStore() final;
 
diff --git a/src/crimson/os/alienstore/thread_pool.cc b/src/crimson/os/alienstore/thread_pool.cc
new file mode 100644 (file)
index 0000000..468c733
--- /dev/null
@@ -0,0 +1,79 @@
+#include "thread_pool.h"
+
+#include <chrono>
+#include <pthread.h>
+#include "include/intarith.h"
+
+#include "include/ceph_assert.h"
+#include "crimson/common/config_proxy.h"
+
+using crimson::common::local_conf;
+
+namespace crimson::os {
+
+ThreadPool::ThreadPool(size_t n_threads,
+                       size_t queue_sz,
+                       unsigned cpu_id)
+  : queue_size{round_up_to(queue_sz, seastar::smp::count)},
+    pending{queue_size}
+{
+  auto queue_max_wait = std::chrono::seconds(local_conf()->threadpool_empty_queue_max_wait);
+  for (size_t i = 0; i < n_threads; i++) {
+    threads.emplace_back([this, cpu_id, queue_max_wait] {
+      pin(cpu_id);
+      loop(queue_max_wait);
+    });
+  }
+}
+
+ThreadPool::~ThreadPool()
+{
+  for (auto& thread : threads) {
+    thread.join();
+  }
+}
+
+void ThreadPool::pin(unsigned cpu_id)
+{
+  cpu_set_t cs;
+  CPU_ZERO(&cs);
+  CPU_SET(cpu_id, &cs);
+  [[maybe_unused]] auto r = pthread_setaffinity_np(pthread_self(),
+                                                   sizeof(cs), &cs);
+  ceph_assert(r == 0);
+}
+
+void ThreadPool::loop(std::chrono::milliseconds queue_max_wait)
+{
+  for (;;) {
+    WorkItem* work_item = nullptr;
+    {
+      std::unique_lock lock{mutex};
+      cond.wait_for(lock, queue_max_wait,
+                    [this, &work_item] {
+        return pending.pop(work_item) || is_stopping();
+      });
+    }
+    if (work_item) {
+      work_item->process();
+    } else if (is_stopping()) {
+      break;
+    }
+  }
+}
+
+seastar::future<> ThreadPool::start()
+{
+  auto slots_per_shard = queue_size / seastar::smp::count;
+  return submit_queue.start(slots_per_shard);
+}
+
+seastar::future<> ThreadPool::stop()
+{
+  return submit_queue.stop().then([this] {
+    stopping = true;
+    cond.notify_all();
+  });
+}
+
+} // namespace crimson::os
diff --git a/src/crimson/os/alienstore/thread_pool.h b/src/crimson/os/alienstore/thread_pool.h
new file mode 100644 (file)
index 0000000..ec3e450
--- /dev/null
@@ -0,0 +1,131 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+// vim: ts=8 sw=2 smarttab
+#pragma once
+
+#include <atomic>
+#include <condition_variable>
+#include <tuple>
+#include <type_traits>
+#include <boost/lockfree/queue.hpp>
+#include <boost/optional.hpp>
+#include <seastar/core/future.hh>
+#include <seastar/core/gate.hh>
+#include <seastar/core/reactor.hh>
+#include <seastar/core/semaphore.hh>
+#include <seastar/core/sharded.hh>
+
+namespace crimson::os {
+
+struct WorkItem {
+  virtual ~WorkItem() {}
+  virtual void process() = 0;
+};
+
+template<typename Func>
+struct Task final : WorkItem {
+  using T = std::invoke_result_t<Func>;
+  using future_state_t = std::conditional_t<std::is_void_v<T>,
+                                           seastar::future_state<>,
+                                           seastar::future_state<T>>;
+  using futurator_t = seastar::futurize<T>;
+public:
+  explicit Task(Func&& f)
+    : func(std::move(f))
+  {}
+  void process() override {
+    try {
+      if constexpr (std::is_void_v<T>) {
+        func();
+        state.set();
+      } else {
+        state.set(func());
+      }
+    } catch (...) {
+      state.set_exception(std::current_exception());
+    }
+    on_done.write_side().signal(1);
+  }
+  typename futurator_t::type get_future() {
+    return on_done.wait().then([this](size_t) {
+      if (state.failed()) {
+       return futurator_t::make_exception_future(state.get_exception());
+      } else {
+       return futurator_t::from_tuple(state.get_value());
+      }
+    });
+  }
+private:
+  Func func;
+  future_state_t state;
+  seastar::readable_eventfd on_done;
+};
+
+struct SubmitQueue {
+  seastar::semaphore free_slots;
+  seastar::gate pending_tasks;
+  explicit SubmitQueue(size_t num_free_slots)
+    : free_slots(num_free_slots)
+  {}
+  seastar::future<> stop() {
+    return pending_tasks.close();
+  }
+};
+
+/// an engine for scheduling non-seastar tasks from seastar fibers
+class ThreadPool {
+  std::atomic<bool> stopping = false;
+  std::mutex mutex;
+  std::condition_variable cond;
+  std::vector<std::thread> threads;
+  seastar::sharded<SubmitQueue> submit_queue;
+  const size_t queue_size;
+  boost::lockfree::queue<WorkItem*> pending;
+
+  void loop(std::chrono::milliseconds queue_max_wait);
+  bool is_stopping() const {
+    return stopping.load(std::memory_order_relaxed);
+  }
+  static void pin(unsigned cpu_id);
+  seastar::semaphore& local_free_slots() {
+    return submit_queue.local().free_slots;
+  }
+  ThreadPool(const ThreadPool&) = delete;
+  ThreadPool& operator=(const ThreadPool&) = delete;
+public:
+  /**
+   * @param queue_sz the depth of pending queue. before a task is scheduled,
+   *                 it waits in this queue. we will round this number to
+   *                 multiple of the number of cores.
+   * @param n_threads the number of threads in this thread pool.
+   * @param cpu the CPU core to which this thread pool is assigned
+   * @note each @c Task has its own crimson::thread::Condition, which possesses
+   * an fd, so we should keep the size of queue under a reasonable limit.
+   */
+  ThreadPool(size_t n_threads, size_t queue_sz, unsigned cpu);
+  ~ThreadPool();
+  seastar::future<> start();
+  seastar::future<> stop();
+  template<typename Func, typename...Args>
+  auto submit(Func&& func, Args&&... args) {
+    auto packaged = [func=std::move(func),
+                     args=std::forward_as_tuple(args...)] {
+      return std::apply(std::move(func), std::move(args));
+    };
+    return seastar::with_gate(submit_queue.local().pending_tasks,
+      [packaged=std::move(packaged), this] {
+        return local_free_slots().wait()
+          .then([packaged=std::move(packaged), this] {
+            auto task = new Task{std::move(packaged)};
+            auto fut = task->get_future();
+            pending.push(task);
+            cond.notify_one();
+            return fut.finally([task, this] {
+              local_free_slots().signal();
+              delete task;
+            });
+          });
+        });
+  }
+};
+
+} // namespace crimson::os
diff --git a/src/crimson/thread/ThreadPool.cc b/src/crimson/thread/ThreadPool.cc
deleted file mode 100644 (file)
index 9914d87..0000000
+++ /dev/null
@@ -1,79 +0,0 @@
-#include "ThreadPool.h"
-
-#include <chrono>
-#include <pthread.h>
-#include "include/intarith.h"
-
-#include "include/ceph_assert.h"
-#include "crimson/common/config_proxy.h"
-
-using crimson::common::local_conf;
-
-namespace crimson::thread {
-
-ThreadPool::ThreadPool(size_t n_threads,
-                       size_t queue_sz,
-                       unsigned cpu_id)
-  : queue_size{round_up_to(queue_sz, seastar::smp::count)},
-    pending{queue_size}
-{
-  auto queue_max_wait = std::chrono::seconds(local_conf()->threadpool_empty_queue_max_wait);
-  for (size_t i = 0; i < n_threads; i++) {
-    threads.emplace_back([this, cpu_id, queue_max_wait] {
-      pin(cpu_id);
-      loop(queue_max_wait);
-    });
-  }
-}
-
-ThreadPool::~ThreadPool()
-{
-  for (auto& thread : threads) {
-    thread.join();
-  }
-}
-
-void ThreadPool::pin(unsigned cpu_id)
-{
-  cpu_set_t cs;
-  CPU_ZERO(&cs);
-  CPU_SET(cpu_id, &cs);
-  [[maybe_unused]] auto r = pthread_setaffinity_np(pthread_self(),
-                                                   sizeof(cs), &cs);
-  ceph_assert(r == 0);
-}
-
-void ThreadPool::loop(std::chrono::milliseconds queue_max_wait)
-{
-  for (;;) {
-    WorkItem* work_item = nullptr;
-    {
-      std::unique_lock lock{mutex};
-      cond.wait_for(lock, queue_max_wait,
-                    [this, &work_item] {
-        return pending.pop(work_item) || is_stopping();
-      });
-    }
-    if (work_item) {
-      work_item->process();
-    } else if (is_stopping()) {
-      break;
-    }
-  }
-}
-
-seastar::future<> ThreadPool::start()
-{
-  auto slots_per_shard = queue_size / seastar::smp::count;
-  return submit_queue.start(slots_per_shard);
-}
-
-seastar::future<> ThreadPool::stop()
-{
-  return submit_queue.stop().then([this] {
-    stopping = true;
-    cond.notify_all();
-  });
-}
-
-} // namespace crimson::thread
diff --git a/src/crimson/thread/ThreadPool.h b/src/crimson/thread/ThreadPool.h
deleted file mode 100644 (file)
index 91c08ea..0000000
+++ /dev/null
@@ -1,131 +0,0 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
-// vim: ts=8 sw=2 smarttab
-#pragma once
-
-#include <atomic>
-#include <condition_variable>
-#include <tuple>
-#include <type_traits>
-#include <boost/lockfree/queue.hpp>
-#include <boost/optional.hpp>
-#include <seastar/core/future.hh>
-#include <seastar/core/gate.hh>
-#include <seastar/core/reactor.hh>
-#include <seastar/core/semaphore.hh>
-#include <seastar/core/sharded.hh>
-
-namespace crimson::thread {
-
-struct WorkItem {
-  virtual ~WorkItem() {}
-  virtual void process() = 0;
-};
-
-template<typename Func>
-struct Task final : WorkItem {
-  using T = std::invoke_result_t<Func>;
-  using future_state_t = std::conditional_t<std::is_void_v<T>,
-                                           seastar::future_state<>,
-                                           seastar::future_state<T>>;
-  using futurator_t = seastar::futurize<T>;
-public:
-  explicit Task(Func&& f)
-    : func(std::move(f))
-  {}
-  void process() override {
-    try {
-      if constexpr (std::is_void_v<T>) {
-        func();
-        state.set();
-      } else {
-        state.set(func());
-      }
-    } catch (...) {
-      state.set_exception(std::current_exception());
-    }
-    on_done.write_side().signal(1);
-  }
-  typename futurator_t::type get_future() {
-    return on_done.wait().then([this](size_t) {
-      if (state.failed()) {
-       return futurator_t::make_exception_future(state.get_exception());
-      } else {
-       return futurator_t::from_tuple(state.get_value());
-      }
-    });
-  }
-private:
-  Func func;
-  future_state_t state;
-  seastar::readable_eventfd on_done;
-};
-
-struct SubmitQueue {
-  seastar::semaphore free_slots;
-  seastar::gate pending_tasks;
-  explicit SubmitQueue(size_t num_free_slots)
-    : free_slots(num_free_slots)
-  {}
-  seastar::future<> stop() {
-    return pending_tasks.close();
-  }
-};
-
-/// an engine for scheduling non-seastar tasks from seastar fibers
-class ThreadPool {
-  std::atomic<bool> stopping = false;
-  std::mutex mutex;
-  std::condition_variable cond;
-  std::vector<std::thread> threads;
-  seastar::sharded<SubmitQueue> submit_queue;
-  const size_t queue_size;
-  boost::lockfree::queue<WorkItem*> pending;
-
-  void loop(std::chrono::milliseconds queue_max_wait);
-  bool is_stopping() const {
-    return stopping.load(std::memory_order_relaxed);
-  }
-  static void pin(unsigned cpu_id);
-  seastar::semaphore& local_free_slots() {
-    return submit_queue.local().free_slots;
-  }
-  ThreadPool(const ThreadPool&) = delete;
-  ThreadPool& operator=(const ThreadPool&) = delete;
-public:
-  /**
-   * @param queue_sz the depth of pending queue. before a task is scheduled,
-   *                 it waits in this queue. we will round this number to
-   *                 multiple of the number of cores.
-   * @param n_threads the number of threads in this thread pool.
-   * @param cpu the CPU core to which this thread pool is assigned
-   * @note each @c Task has its own crimson::thread::Condition, which possesses
-   * an fd, so we should keep the size of queue under a reasonable limit.
-   */
-  ThreadPool(size_t n_threads, size_t queue_sz, unsigned cpu);
-  ~ThreadPool();
-  seastar::future<> start();
-  seastar::future<> stop();
-  template<typename Func, typename...Args>
-  auto submit(Func&& func, Args&&... args) {
-    auto packaged = [func=std::move(func),
-                     args=std::forward_as_tuple(args...)] {
-      return std::apply(std::move(func), std::move(args));
-    };
-    return seastar::with_gate(submit_queue.local().pending_tasks,
-      [packaged=std::move(packaged), this] {
-        return local_free_slots().wait()
-          .then([packaged=std::move(packaged), this] {
-            auto task = new Task{std::move(packaged)};
-            auto fut = task->get_future();
-            pending.push(task);
-            cond.notify_one();
-            return fut.finally([task, this] {
-              local_free_slots().signal();
-              delete task;
-            });
-          });
-        });
-  }
-};
-
-} // namespace crimson::thread
index 6516a13879732fb176afca1290ba4dfec7f63f74..eb1d0fd91c01062d619d3d99e6b129a2675def72 100644 (file)
@@ -30,11 +30,14 @@ add_executable(unittest_async_echo
   test_async_echo.cc)
 target_link_libraries(unittest_async_echo ceph-common global)
 
-add_executable(unittest_seastar_thread_pool
-  test_thread_pool.cc)
-add_ceph_test(unittest_seastar_thread_pool
-  unittest_seastar_thread_pool --memory 256M --smp 1)
-target_link_libraries(unittest_seastar_thread_pool crimson)
+add_executable(unittest_seastar_alienstore_thread_pool
+  test_alienstore_thread_pool.cc)
+add_ceph_test(unittest_seastar_alienstore_thread_pool
+  unittest_seastar_alienstore_thread_pool --memory 256M --smp 1)
+target_link_libraries(unittest_seastar_alienstore_thread_pool
+  crimson
+  crimson-os
+  crimson-common)
 
 add_executable(unittest_seastar_config
   test_config.cc)
diff --git a/src/test/crimson/test_alienstore_thread_pool.cc b/src/test/crimson/test_alienstore_thread_pool.cc
new file mode 100644 (file)
index 0000000..82b98ab
--- /dev/null
@@ -0,0 +1,78 @@
+#include <chrono>
+#include <iostream>
+#include <numeric>
+#include <seastar/core/app-template.hh>
+#include "common/ceph_argparse.h"
+#include "crimson/common/config_proxy.h"
+#include "crimson/os/alienstore/thread_pool.h"
+#include "include/msgr.h"
+
+using namespace std::chrono_literals;
+using ThreadPool = crimson::os::ThreadPool;
+using crimson::common::local_conf;
+
+seastar::future<> test_accumulate(ThreadPool& tp) {
+  static constexpr auto N = 5;
+  static constexpr auto M = 1;
+  auto slow_plus = [&tp](int i) {
+    return tp.submit([=] {
+      std::this_thread::sleep_for(10ns);
+      return i + M;
+    });
+  };
+  return seastar::map_reduce(
+    boost::irange(0, N), slow_plus, 0, std::plus{}).then([] (int sum) {
+    auto r = boost::irange(0 + M, N + M);
+    if (sum != std::accumulate(r.begin(), r.end(), 0)) {
+      throw std::runtime_error("test_accumulate failed");
+    }
+  });
+}
+
+seastar::future<> test_void_return(ThreadPool& tp) {
+  return tp.submit([=] {
+    std::this_thread::sleep_for(10ns);
+  });
+}
+
+int main(int argc, char** argv)
+{
+  seastar::app_template app;
+  return app.run(argc, argv, [] {
+    std::vector<const char*> args;
+    std::string cluster;
+    std::string conf_file_list;
+    auto init_params = ceph_argparse_early_args(args,
+                                                CEPH_ENTITY_TYPE_CLIENT,
+                                                &cluster,
+                                                &conf_file_list);
+    return crimson::common::sharded_conf().start(init_params.name, cluster)
+    .then([conf_file_list] {
+      return local_conf().parse_config_files(conf_file_list);
+    }).then([] {
+      return seastar::do_with(std::make_unique<crimson::os::ThreadPool>(2, 128, 0),
+                              [](auto& tp) {
+        return tp->start().then([&tp] {
+          return test_accumulate(*tp);
+        }).then([&tp] {
+          return test_void_return(*tp);
+        }).finally([&tp] {
+          return tp->stop();
+        });
+      });
+    }).finally([] {
+      return crimson::common::sharded_conf().stop();
+    }).handle_exception([](auto e) {
+      std::cerr << "Error: " << e << std::endl;
+      seastar::engine().exit(1);
+    });
+  });
+}
+
+/*
+ * Local Variables:
+ * compile-command: "make -j4 \
+ * -C ../../../build \
+ * unittest_seastar_thread_pool"
+ * End:
+ */
diff --git a/src/test/crimson/test_thread_pool.cc b/src/test/crimson/test_thread_pool.cc
deleted file mode 100644 (file)
index 962470e..0000000
+++ /dev/null
@@ -1,78 +0,0 @@
-#include <chrono>
-#include <iostream>
-#include <numeric>
-#include <seastar/core/app-template.hh>
-#include "common/ceph_argparse.h"
-#include "crimson/common/config_proxy.h"
-#include "crimson/thread/ThreadPool.h"
-#include "include/msgr.h"
-
-using namespace std::chrono_literals;
-using ThreadPool = crimson::thread::ThreadPool;
-using crimson::common::local_conf;
-
-seastar::future<> test_accumulate(ThreadPool& tp) {
-  static constexpr auto N = 5;
-  static constexpr auto M = 1;
-  auto slow_plus = [&tp](int i) {
-    return tp.submit([=] {
-      std::this_thread::sleep_for(10ns);
-      return i + M;
-    });
-  };
-  return seastar::map_reduce(
-    boost::irange(0, N), slow_plus, 0, std::plus{}).then([] (int sum) {
-    auto r = boost::irange(0 + M, N + M);
-    if (sum != std::accumulate(r.begin(), r.end(), 0)) {
-      throw std::runtime_error("test_accumulate failed");
-    }
-  });
-}
-
-seastar::future<> test_void_return(ThreadPool& tp) {
-  return tp.submit([=] {
-    std::this_thread::sleep_for(10ns);
-  });
-}
-
-int main(int argc, char** argv)
-{
-  seastar::app_template app;
-  return app.run(argc, argv, [] {
-    std::vector<const char*> args;
-    std::string cluster;
-    std::string conf_file_list;
-    auto init_params = ceph_argparse_early_args(args,
-                                                CEPH_ENTITY_TYPE_CLIENT,
-                                                &cluster,
-                                                &conf_file_list);
-    return crimson::common::sharded_conf().start(init_params.name, cluster)
-    .then([conf_file_list] {
-      return local_conf().parse_config_files(conf_file_list);
-    }).then([] {
-      return seastar::do_with(std::make_unique<crimson::thread::ThreadPool>(2, 128, 0),
-                              [](auto& tp) {
-        return tp->start().then([&tp] {
-          return test_accumulate(*tp);
-        }).then([&tp] {
-          return test_void_return(*tp);
-        }).finally([&tp] {
-          return tp->stop();
-        });
-      });
-    }).finally([] {
-      return crimson::common::sharded_conf().stop();
-    }).handle_exception([](auto e) {
-      std::cerr << "Error: " << e << std::endl;
-      seastar::engine().exit(1);
-    });
-  });
-}
-
-/*
- * Local Variables:
- * compile-command: "make -j4 \
- * -C ../../../build \
- * unittest_seastar_thread_pool"
- * End:
- */