]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/os/alienstore: shard objectstore ops to threads by pgid
authorXuehan Xu <xxhdx1985126@gmail.com>
Mon, 1 Mar 2021 04:05:05 +0000 (12:05 +0800)
committerXuehan Xu <xxhdx1985126@gmail.com>
Wed, 17 Mar 2021 02:39:24 +0000 (10:39 +0800)
Avoid synchronize all alienstore worker thread through a common condition variable,
which would cause performance degradation.

Signed-off-by: Xuehan Xu <xxhdx1985126@gmail.com>
src/crimson/os/alienstore/alien_store.cc
src/crimson/os/alienstore/alien_store.h
src/crimson/os/alienstore/thread_pool.cc
src/crimson/os/alienstore/thread_pool.h
src/test/crimson/test_alienstore_thread_pool.cc

index 6b789e4974288ff93db5c42954bb5e1f2ffb307e..ceac8cdafb80b494e850f73e31fbb0d8f4a707eb 100644 (file)
@@ -133,7 +133,8 @@ AlienStore::list_objects(CollectionRef ch,
   return seastar::do_with(std::vector<ghobject_t>(), ghobject_t(),
                           [=] (auto &objects, auto &next) {
     objects.reserve(limit);
-    return tp->submit([=, &objects, &next] {
+    return tp->submit(ch->get_cid().hash_to_shard(tp->size()),
+      [=, &objects, &next] {
       auto c = static_cast<AlienCollection*>(ch.get());
       return store->collection_list(c->collection, start, end,
                                     store->get_ideal_list_max(),
@@ -214,7 +215,7 @@ AlienStore::read(CollectionRef ch,
 {
   logger().debug("{}", __func__);
   return seastar::do_with(ceph::bufferlist{}, [=] (auto &bl) {
-    return tp->submit([=, &bl] {
+    return tp->submit(ch->get_cid().hash_to_shard(tp->size()), [=, &bl] {
       auto c = static_cast<AlienCollection*>(ch.get());
       return store->read(c->collection, oid, offset, len, bl, op_flags);
     }).then([&bl] (int r) -> read_errorator::future<ceph::bufferlist> {
@@ -238,7 +239,8 @@ AlienStore::readv(CollectionRef ch,
   logger().debug("{}", __func__);
   return seastar::do_with(ceph::bufferlist{},
     [this, ch, oid, &m, op_flags](auto& bl) {
-    return tp->submit([this, ch, oid, &m, op_flags, &bl] {
+    return tp->submit(ch->get_cid().hash_to_shard(tp->size()),
+      [this, ch, oid, &m, op_flags, &bl] {
       auto c = static_cast<AlienCollection*>(ch.get());
       return store->readv(c->collection, oid, m, bl, op_flags);
     }).then([&bl](int r) -> read_errorator::future<ceph::bufferlist> {
@@ -260,7 +262,7 @@ AlienStore::get_attr(CollectionRef ch,
 {
   logger().debug("{}", __func__);
   return seastar::do_with(ceph::bufferptr{}, [=] (auto &value) {
-    return tp->submit([=, &value] {
+    return tp->submit(ch->get_cid().hash_to_shard(tp->size()), [=, &value] {
       auto c =static_cast<AlienCollection*>(ch.get());
       return store->getattr(c->collection, oid,
                            static_cast<std::string>(name).c_str(), value);
@@ -283,7 +285,7 @@ AlienStore::get_attrs(CollectionRef ch,
 {
   logger().debug("{}", __func__);
   return seastar::do_with(attrs_t{}, [=] (auto &aset) {
-    return tp->submit([=, &aset] {
+    return tp->submit(ch->get_cid().hash_to_shard(tp->size()), [=, &aset] {
       auto c = static_cast<AlienCollection*>(ch.get());
       return store->getattrs(c->collection, oid,
                             reinterpret_cast<map<string,bufferptr>&>(aset));
@@ -304,7 +306,7 @@ auto AlienStore::omap_get_values(CollectionRef ch,
 {
   logger().debug("{}", __func__);
   return seastar::do_with(omap_values_t{}, [=] (auto &values) {
-    return tp->submit([=, &values] {
+    return tp->submit(ch->get_cid().hash_to_shard(tp->size()), [=, &values] {
       auto c = static_cast<AlienCollection*>(ch.get());
       return store->omap_get_values(c->collection, oid, keys,
                                    reinterpret_cast<map<string, bufferlist>*>(&values));
@@ -326,7 +328,7 @@ auto AlienStore::omap_get_values(CollectionRef ch,
 {
   logger().debug("{} with_start", __func__);
   return seastar::do_with(omap_values_t{}, [=] (auto &values) {
-    return tp->submit([=, &values] {
+    return tp->submit(ch->get_cid().hash_to_shard(tp->size()), [=, &values] {
       auto c = static_cast<AlienCollection*>(ch.get());
       return store->omap_get_values(c->collection, oid, start,
                                    reinterpret_cast<map<string, bufferlist>*>(&values));
@@ -360,7 +362,8 @@ seastar::future<> AlienStore::do_transaction(CollectionRef ch,
        return alien_coll->with_lock([this, ch, id, &txn, &done] {
          Context *crimson_wrapper =
            ceph::os::Transaction::collect_all_contexts(txn);
-         return tp->submit([this, ch, id, crimson_wrapper, &txn, &done] {
+         return tp->submit(ch->get_cid().hash_to_shard(tp->size()),
+           [this, ch, id, crimson_wrapper, &txn, &done] {
            txn.register_on_commit(new OnCommit(id, done, crimson_wrapper, txn));
            auto c = static_cast<AlienCollection*>(ch.get());
            return store->queue_transaction(c->collection, std::move(txn));
@@ -436,7 +439,7 @@ seastar::future<struct stat> AlienStore::stat(
   const ghobject_t& oid)
 {
   return seastar::do_with((struct stat){}, [this, ch, oid](auto& st) {
-    return tp->submit([this, ch, oid, &st] {
+    return tp->submit(ch->get_cid().hash_to_shard(tp->size()), [this, ch, oid, &st] {
       auto c = static_cast<AlienCollection*>(ch.get());
       store->stat(c->collection, oid, &st);
       return st;
@@ -449,7 +452,7 @@ auto AlienStore::omap_get_header(CollectionRef ch,
   -> read_errorator::future<ceph::bufferlist>
 {
   return seastar::do_with(ceph::bufferlist(), [=](auto& bl) {
-    return tp->submit([=, &bl] {
+    return tp->submit(ch->get_cid().hash_to_shard(tp->size()), [=, &bl] {
       auto c = static_cast<AlienCollection*>(ch.get());
       return store->omap_get_header(c->collection, oid, &bl);
     }).then([&bl] (int r) -> read_errorator::future<ceph::bufferlist> {
@@ -472,7 +475,7 @@ seastar::future<std::map<uint64_t, uint64_t>> AlienStore::fiemap(
   uint64_t len)
 {
   return seastar::do_with(std::map<uint64_t, uint64_t>(), [=](auto& destmap) {
-    return tp->submit([=, &destmap] {
+    return tp->submit(ch->get_cid().hash_to_shard(tp->size()), [=, &destmap] {
       auto c = static_cast<AlienCollection*>(ch.get());
       return store->fiemap(c->collection, oid, off, len, destmap);
     }).then([&destmap] (int i) {
@@ -487,12 +490,14 @@ seastar::future<FuturizedStore::OmapIteratorRef> AlienStore::get_omap_iterator(
   CollectionRef ch,
   const ghobject_t& oid)
 {
-  return tp->submit([=] {
+  return tp->submit(ch->get_cid().hash_to_shard(tp->size()),
+    [this, ch, oid] {
     auto c = static_cast<AlienCollection*>(ch.get());
     auto iter = store->get_omap_iterator(c->collection, oid);
     return FuturizedStore::OmapIteratorRef(
              new AlienStore::AlienOmapIterator(iter,
-                                               this));
+                                               this,
+                                               ch));
   });
 }
 
@@ -500,7 +505,8 @@ seastar::future<FuturizedStore::OmapIteratorRef> AlienStore::get_omap_iterator(
 //      needs further optimization.
 seastar::future<> AlienStore::AlienOmapIterator::seek_to_first()
 {
-  return store->tp->submit([=] {
+  return store->tp->submit(ch->get_cid().hash_to_shard(store->tp->size()),
+    [this] {
     return iter->seek_to_first();
   }).then([] (int r) {
     assert(r == 0);
@@ -511,7 +517,8 @@ seastar::future<> AlienStore::AlienOmapIterator::seek_to_first()
 seastar::future<> AlienStore::AlienOmapIterator::upper_bound(
   const std::string& after)
 {
-  return store->tp->submit([this, after] {
+  return store->tp->submit(ch->get_cid().hash_to_shard(store->tp->size()),
+    [this, after] {
     return iter->upper_bound(after);
   }).then([] (int r) {
     assert(r == 0);
@@ -522,7 +529,8 @@ seastar::future<> AlienStore::AlienOmapIterator::upper_bound(
 seastar::future<> AlienStore::AlienOmapIterator::lower_bound(
   const std::string& to)
 {
-  return store->tp->submit([this, to] {
+  return store->tp->submit(ch->get_cid().hash_to_shard(store->tp->size()),
+    [this, to] {
     return iter->lower_bound(to);
   }).then([] (int r) {
     assert(r == 0);
@@ -532,7 +540,8 @@ seastar::future<> AlienStore::AlienOmapIterator::lower_bound(
 
 seastar::future<> AlienStore::AlienOmapIterator::next()
 {
-  return store->tp->submit([this] {
+  return store->tp->submit(ch->get_cid().hash_to_shard(store->tp->size()),
+    [this] {
     return iter->next();
   }).then([] (int r) {
     assert(r == 0);
index c94da3ee580b9e5f8c62c4f7a1460be92697fb2d..08790c5fdfa6eb94023553bc4fbf27f62cf1e6d8 100644 (file)
@@ -1,5 +1,5 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
+// vim: ts=8 sw=2 smarttab expandtab
 
 #pragma once
 
@@ -24,7 +24,8 @@ public:
   class AlienOmapIterator final : public OmapIterator {
   public:
     AlienOmapIterator(ObjectMap::ObjectMapIterator& it,
-       AlienStore* store) : iter(it), store(store) {}
+        AlienStore* store, const CollectionRef& ch)
+      : iter(it), store(store), ch(ch) {}
     seastar::future<> seek_to_first();
     seastar::future<> upper_bound(const std::string& after);
     seastar::future<> lower_bound(const std::string& to);
@@ -36,6 +37,7 @@ public:
   private:
     ObjectMap::ObjectMapIterator iter;
     AlienStore* store;
+    CollectionRef ch;
   };
   AlienStore(const std::string& path, const ConfigValues& values);
   ~AlienStore() final;
index c42947bfa2fb24b0169311d0175d57b189d7b5cb..4b81dabd802502c25064a1e080ffe866e1f171cd 100644 (file)
@@ -16,16 +16,18 @@ namespace crimson::os {
 ThreadPool::ThreadPool(size_t n_threads,
                        size_t queue_sz,
                        std::vector<uint64_t> cpus)
-  : queue_size{round_up_to(queue_sz, seastar::smp::count)},
-    pending{queue_size}
+  : n_threads(n_threads),
+    queue_size{round_up_to(queue_sz, seastar::smp::count)},
+    pending_queues(n_threads)
 {
   auto queue_max_wait = std::chrono::seconds(local_conf()->threadpool_empty_queue_max_wait);
   for (size_t i = 0; i < n_threads; i++) {
-    threads.emplace_back([this, cpus, queue_max_wait] {
+    threads.emplace_back([this, cpus, queue_max_wait, i] {
       if (!cpus.empty()) {
         pin(cpus);
       }
-      loop(queue_max_wait);
+      (void) pthread_setname_np(pthread_self(), "alien-store-tp");
+      loop(queue_max_wait, i);
     });
   }
 }
@@ -49,17 +51,12 @@ void ThreadPool::pin(const std::vector<uint64_t>& cpus)
   ceph_assert(r == 0);
 }
 
-void ThreadPool::loop(std::chrono::milliseconds queue_max_wait)
+void ThreadPool::loop(std::chrono::milliseconds queue_max_wait, size_t shard)
 {
+  auto& pending = pending_queues[shard];
   for (;;) {
     WorkItem* work_item = nullptr;
-    {
-      std::unique_lock lock{mutex};
-      cond.wait_for(lock, queue_max_wait,
-                    [this, &work_item] {
-        return pending.pop(work_item) || is_stopping();
-      });
-    }
+    work_item = pending.pop_front(queue_max_wait);
     if (work_item) {
       work_item->process();
     } else if (is_stopping()) {
@@ -78,7 +75,9 @@ seastar::future<> ThreadPool::stop()
 {
   return submit_queue.stop().then([this] {
     stopping = true;
-    cond.notify_all();
+    for (auto& q : pending_queues) {
+      q.stop();
+    }
   });
 }
 
index 8b66725dd70953b2f6bd8d7baaf86b01b9f5bb29..bbc1430f75a2284b4017ea60b6bb1aad7459e56b 100644 (file)
@@ -1,5 +1,5 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
-// vim: ts=8 sw=2 smarttab
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
+// vim: ts=8 sw=2 smarttab expandtab
 #pragma once
 
 #include <atomic>
@@ -72,17 +72,51 @@ struct SubmitQueue {
   }
 };
 
+struct ShardedWorkQueue {
+public:
+  WorkItem* pop_front(std::chrono::milliseconds& queue_max_wait) {
+    WorkItem* work_item = nullptr;
+    std::unique_lock lock{mutex};
+    cond.wait_for(lock, queue_max_wait,
+                 [this, &work_item] {
+      bool empty = true;
+      if (!pending.empty()) {
+        empty = false;
+        work_item = pending.front();
+        pending.pop_front();
+      }
+      return !empty || is_stopping();
+    });
+    return work_item;
+  }
+  void stop() {
+    stopping = true;
+    cond.notify_all();
+  }
+  void push_back(WorkItem* work_item) {
+    pending.push_back(work_item);
+    cond.notify_one();
+  }
+private:
+  bool is_stopping() const {
+    return stopping;
+  }
+  bool stopping = false;
+  std::mutex mutex;
+  std::condition_variable cond;
+  std::deque<WorkItem*> pending;
+};
+
 /// an engine for scheduling non-seastar tasks from seastar fibers
 class ThreadPool {
+  size_t n_threads;
   std::atomic<bool> stopping = false;
-  std::mutex mutex;
-  std::condition_variable cond;
   std::vector<std::thread> threads;
   seastar::sharded<SubmitQueue> submit_queue;
   const size_t queue_size;
-  boost::lockfree::queue<WorkItem*> pending;
+  std::vector<ShardedWorkQueue> pending_queues;
 
-  void loop(std::chrono::milliseconds queue_max_wait);
+  void loop(std::chrono::milliseconds queue_max_wait, size_t shard);
   bool is_stopping() const {
     return stopping.load(std::memory_order_relaxed);
   }
@@ -106,20 +140,22 @@ public:
   ~ThreadPool();
   seastar::future<> start();
   seastar::future<> stop();
+  size_t size() {
+    return n_threads;
+  }
   template<typename Func, typename...Args>
-  auto submit(Func&& func, Args&&... args) {
+  auto submit(int shard, Func&& func, Args&&... args) {
     auto packaged = [func=std::move(func),
                      args=std::forward_as_tuple(args...)] {
       return std::apply(std::move(func), std::move(args));
     };
     return seastar::with_gate(submit_queue.local().pending_tasks,
-      [packaged=std::move(packaged), this] {
+      [packaged=std::move(packaged), shard, this] {
         return local_free_slots().wait()
-          .then([packaged=std::move(packaged), this] {
+          .then([packaged=std::move(packaged), shard, this] {
             auto task = new Task{std::move(packaged)};
             auto fut = task->get_future();
-            pending.push(task);
-            cond.notify_one();
+           pending_queues[shard].push_back(task);
             return fut.finally([task, this] {
               local_free_slots().signal();
               delete task;
@@ -127,6 +163,12 @@ public:
           });
         });
   }
+
+  template<typename Func>
+  auto submit(Func&& func) {
+    return submit(::rand() % n_threads,
+                 std::forward<Func>(func));
+  }
 };
 
 } // namespace crimson::os
index 82b98abbb60c879cba22916efc21527d34370eab..7cfffec757b7421eac01709a8b8dbd1882c06baf 100644 (file)
@@ -15,7 +15,7 @@ seastar::future<> test_accumulate(ThreadPool& tp) {
   static constexpr auto N = 5;
   static constexpr auto M = 1;
   auto slow_plus = [&tp](int i) {
-    return tp.submit([=] {
+    return tp.submit(::rand() % 2, [=] {
       std::this_thread::sleep_for(10ns);
       return i + M;
     });
@@ -30,7 +30,7 @@ seastar::future<> test_accumulate(ThreadPool& tp) {
 }
 
 seastar::future<> test_void_return(ThreadPool& tp) {
-  return tp.submit([=] {
+  return tp.submit(::rand() % 2, [=] {
     std::this_thread::sleep_for(10ns);
   });
 }
@@ -50,7 +50,7 @@ int main(int argc, char** argv)
     .then([conf_file_list] {
       return local_conf().parse_config_files(conf_file_list);
     }).then([] {
-      return seastar::do_with(std::make_unique<crimson::os::ThreadPool>(2, 128, 0),
+      return seastar::do_with(std::make_unique<crimson::os::ThreadPool>(2, 128, (std::vector<uint64_t>){0}),
                               [](auto& tp) {
         return tp->start().then([&tp] {
           return test_accumulate(*tp);