From c52dc96c52cb9bacc9b1be8428b74cf3a21e725f Mon Sep 17 00:00:00 2001 From: Xuehan Xu Date: Mon, 1 Mar 2021 12:05:05 +0800 Subject: [PATCH] crimson/os/alienstore: shard objectstore ops to threads by pgid Avoid synchronize all alienstore worker thread through a common condition variable, which would cause performance degradation. Signed-off-by: Xuehan Xu --- src/crimson/os/alienstore/alien_store.cc | 43 ++++++++----- src/crimson/os/alienstore/alien_store.h | 8 ++- src/crimson/os/alienstore/thread_pool.cc | 25 ++++---- src/crimson/os/alienstore/thread_pool.h | 64 +++++++++++++++---- .../crimson/test_alienstore_thread_pool.cc | 6 +- 5 files changed, 99 insertions(+), 47 deletions(-) diff --git a/src/crimson/os/alienstore/alien_store.cc b/src/crimson/os/alienstore/alien_store.cc index 6b789e4974288..ceac8cdafb80b 100644 --- a/src/crimson/os/alienstore/alien_store.cc +++ b/src/crimson/os/alienstore/alien_store.cc @@ -133,7 +133,8 @@ AlienStore::list_objects(CollectionRef ch, return seastar::do_with(std::vector(), ghobject_t(), [=] (auto &objects, auto &next) { objects.reserve(limit); - return tp->submit([=, &objects, &next] { + return tp->submit(ch->get_cid().hash_to_shard(tp->size()), + [=, &objects, &next] { auto c = static_cast(ch.get()); return store->collection_list(c->collection, start, end, store->get_ideal_list_max(), @@ -214,7 +215,7 @@ AlienStore::read(CollectionRef ch, { logger().debug("{}", __func__); return seastar::do_with(ceph::bufferlist{}, [=] (auto &bl) { - return tp->submit([=, &bl] { + return tp->submit(ch->get_cid().hash_to_shard(tp->size()), [=, &bl] { auto c = static_cast(ch.get()); return store->read(c->collection, oid, offset, len, bl, op_flags); }).then([&bl] (int r) -> read_errorator::future { @@ -238,7 +239,8 @@ AlienStore::readv(CollectionRef ch, logger().debug("{}", __func__); return seastar::do_with(ceph::bufferlist{}, [this, ch, oid, &m, op_flags](auto& bl) { - return tp->submit([this, ch, oid, &m, op_flags, &bl] { + return tp->submit(ch->get_cid().hash_to_shard(tp->size()), + [this, ch, oid, &m, op_flags, &bl] { auto c = static_cast(ch.get()); return store->readv(c->collection, oid, m, bl, op_flags); }).then([&bl](int r) -> read_errorator::future { @@ -260,7 +262,7 @@ AlienStore::get_attr(CollectionRef ch, { logger().debug("{}", __func__); return seastar::do_with(ceph::bufferptr{}, [=] (auto &value) { - return tp->submit([=, &value] { + return tp->submit(ch->get_cid().hash_to_shard(tp->size()), [=, &value] { auto c =static_cast(ch.get()); return store->getattr(c->collection, oid, static_cast(name).c_str(), value); @@ -283,7 +285,7 @@ AlienStore::get_attrs(CollectionRef ch, { logger().debug("{}", __func__); return seastar::do_with(attrs_t{}, [=] (auto &aset) { - return tp->submit([=, &aset] { + return tp->submit(ch->get_cid().hash_to_shard(tp->size()), [=, &aset] { auto c = static_cast(ch.get()); return store->getattrs(c->collection, oid, reinterpret_cast&>(aset)); @@ -304,7 +306,7 @@ auto AlienStore::omap_get_values(CollectionRef ch, { logger().debug("{}", __func__); return seastar::do_with(omap_values_t{}, [=] (auto &values) { - return tp->submit([=, &values] { + return tp->submit(ch->get_cid().hash_to_shard(tp->size()), [=, &values] { auto c = static_cast(ch.get()); return store->omap_get_values(c->collection, oid, keys, reinterpret_cast*>(&values)); @@ -326,7 +328,7 @@ auto AlienStore::omap_get_values(CollectionRef ch, { logger().debug("{} with_start", __func__); return seastar::do_with(omap_values_t{}, [=] (auto &values) { - return tp->submit([=, &values] { + return tp->submit(ch->get_cid().hash_to_shard(tp->size()), [=, &values] { auto c = static_cast(ch.get()); return store->omap_get_values(c->collection, oid, start, reinterpret_cast*>(&values)); @@ -360,7 +362,8 @@ seastar::future<> AlienStore::do_transaction(CollectionRef ch, return alien_coll->with_lock([this, ch, id, &txn, &done] { Context *crimson_wrapper = ceph::os::Transaction::collect_all_contexts(txn); - return tp->submit([this, ch, id, crimson_wrapper, &txn, &done] { + return tp->submit(ch->get_cid().hash_to_shard(tp->size()), + [this, ch, id, crimson_wrapper, &txn, &done] { txn.register_on_commit(new OnCommit(id, done, crimson_wrapper, txn)); auto c = static_cast(ch.get()); return store->queue_transaction(c->collection, std::move(txn)); @@ -436,7 +439,7 @@ seastar::future AlienStore::stat( const ghobject_t& oid) { return seastar::do_with((struct stat){}, [this, ch, oid](auto& st) { - return tp->submit([this, ch, oid, &st] { + return tp->submit(ch->get_cid().hash_to_shard(tp->size()), [this, ch, oid, &st] { auto c = static_cast(ch.get()); store->stat(c->collection, oid, &st); return st; @@ -449,7 +452,7 @@ auto AlienStore::omap_get_header(CollectionRef ch, -> read_errorator::future { return seastar::do_with(ceph::bufferlist(), [=](auto& bl) { - return tp->submit([=, &bl] { + return tp->submit(ch->get_cid().hash_to_shard(tp->size()), [=, &bl] { auto c = static_cast(ch.get()); return store->omap_get_header(c->collection, oid, &bl); }).then([&bl] (int r) -> read_errorator::future { @@ -472,7 +475,7 @@ seastar::future> AlienStore::fiemap( uint64_t len) { return seastar::do_with(std::map(), [=](auto& destmap) { - return tp->submit([=, &destmap] { + return tp->submit(ch->get_cid().hash_to_shard(tp->size()), [=, &destmap] { auto c = static_cast(ch.get()); return store->fiemap(c->collection, oid, off, len, destmap); }).then([&destmap] (int i) { @@ -487,12 +490,14 @@ seastar::future AlienStore::get_omap_iterator( CollectionRef ch, const ghobject_t& oid) { - return tp->submit([=] { + return tp->submit(ch->get_cid().hash_to_shard(tp->size()), + [this, ch, oid] { auto c = static_cast(ch.get()); auto iter = store->get_omap_iterator(c->collection, oid); return FuturizedStore::OmapIteratorRef( new AlienStore::AlienOmapIterator(iter, - this)); + this, + ch)); }); } @@ -500,7 +505,8 @@ seastar::future AlienStore::get_omap_iterator( // needs further optimization. seastar::future<> AlienStore::AlienOmapIterator::seek_to_first() { - return store->tp->submit([=] { + return store->tp->submit(ch->get_cid().hash_to_shard(store->tp->size()), + [this] { return iter->seek_to_first(); }).then([] (int r) { assert(r == 0); @@ -511,7 +517,8 @@ seastar::future<> AlienStore::AlienOmapIterator::seek_to_first() seastar::future<> AlienStore::AlienOmapIterator::upper_bound( const std::string& after) { - return store->tp->submit([this, after] { + return store->tp->submit(ch->get_cid().hash_to_shard(store->tp->size()), + [this, after] { return iter->upper_bound(after); }).then([] (int r) { assert(r == 0); @@ -522,7 +529,8 @@ seastar::future<> AlienStore::AlienOmapIterator::upper_bound( seastar::future<> AlienStore::AlienOmapIterator::lower_bound( const std::string& to) { - return store->tp->submit([this, to] { + return store->tp->submit(ch->get_cid().hash_to_shard(store->tp->size()), + [this, to] { return iter->lower_bound(to); }).then([] (int r) { assert(r == 0); @@ -532,7 +540,8 @@ seastar::future<> AlienStore::AlienOmapIterator::lower_bound( seastar::future<> AlienStore::AlienOmapIterator::next() { - return store->tp->submit([this] { + return store->tp->submit(ch->get_cid().hash_to_shard(store->tp->size()), + [this] { return iter->next(); }).then([] (int r) { assert(r == 0); diff --git a/src/crimson/os/alienstore/alien_store.h b/src/crimson/os/alienstore/alien_store.h index c94da3ee580b9..08790c5fdfa6e 100644 --- a/src/crimson/os/alienstore/alien_store.h +++ b/src/crimson/os/alienstore/alien_store.h @@ -1,5 +1,5 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*- +// vim: ts=8 sw=2 smarttab expandtab #pragma once @@ -24,7 +24,8 @@ public: class AlienOmapIterator final : public OmapIterator { public: AlienOmapIterator(ObjectMap::ObjectMapIterator& it, - AlienStore* store) : iter(it), store(store) {} + AlienStore* store, const CollectionRef& ch) + : iter(it), store(store), ch(ch) {} seastar::future<> seek_to_first(); seastar::future<> upper_bound(const std::string& after); seastar::future<> lower_bound(const std::string& to); @@ -36,6 +37,7 @@ public: private: ObjectMap::ObjectMapIterator iter; AlienStore* store; + CollectionRef ch; }; AlienStore(const std::string& path, const ConfigValues& values); ~AlienStore() final; diff --git a/src/crimson/os/alienstore/thread_pool.cc b/src/crimson/os/alienstore/thread_pool.cc index c42947bfa2fb2..4b81dabd80250 100644 --- a/src/crimson/os/alienstore/thread_pool.cc +++ b/src/crimson/os/alienstore/thread_pool.cc @@ -16,16 +16,18 @@ namespace crimson::os { ThreadPool::ThreadPool(size_t n_threads, size_t queue_sz, std::vector cpus) - : queue_size{round_up_to(queue_sz, seastar::smp::count)}, - pending{queue_size} + : n_threads(n_threads), + queue_size{round_up_to(queue_sz, seastar::smp::count)}, + pending_queues(n_threads) { auto queue_max_wait = std::chrono::seconds(local_conf()->threadpool_empty_queue_max_wait); for (size_t i = 0; i < n_threads; i++) { - threads.emplace_back([this, cpus, queue_max_wait] { + threads.emplace_back([this, cpus, queue_max_wait, i] { if (!cpus.empty()) { pin(cpus); } - loop(queue_max_wait); + (void) pthread_setname_np(pthread_self(), "alien-store-tp"); + loop(queue_max_wait, i); }); } } @@ -49,17 +51,12 @@ void ThreadPool::pin(const std::vector& cpus) ceph_assert(r == 0); } -void ThreadPool::loop(std::chrono::milliseconds queue_max_wait) +void ThreadPool::loop(std::chrono::milliseconds queue_max_wait, size_t shard) { + auto& pending = pending_queues[shard]; for (;;) { WorkItem* work_item = nullptr; - { - std::unique_lock lock{mutex}; - cond.wait_for(lock, queue_max_wait, - [this, &work_item] { - return pending.pop(work_item) || is_stopping(); - }); - } + work_item = pending.pop_front(queue_max_wait); if (work_item) { work_item->process(); } else if (is_stopping()) { @@ -78,7 +75,9 @@ seastar::future<> ThreadPool::stop() { return submit_queue.stop().then([this] { stopping = true; - cond.notify_all(); + for (auto& q : pending_queues) { + q.stop(); + } }); } diff --git a/src/crimson/os/alienstore/thread_pool.h b/src/crimson/os/alienstore/thread_pool.h index 8b66725dd7095..bbc1430f75a22 100644 --- a/src/crimson/os/alienstore/thread_pool.h +++ b/src/crimson/os/alienstore/thread_pool.h @@ -1,5 +1,5 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*- +// vim: ts=8 sw=2 smarttab expandtab #pragma once #include @@ -72,17 +72,51 @@ struct SubmitQueue { } }; +struct ShardedWorkQueue { +public: + WorkItem* pop_front(std::chrono::milliseconds& queue_max_wait) { + WorkItem* work_item = nullptr; + std::unique_lock lock{mutex}; + cond.wait_for(lock, queue_max_wait, + [this, &work_item] { + bool empty = true; + if (!pending.empty()) { + empty = false; + work_item = pending.front(); + pending.pop_front(); + } + return !empty || is_stopping(); + }); + return work_item; + } + void stop() { + stopping = true; + cond.notify_all(); + } + void push_back(WorkItem* work_item) { + pending.push_back(work_item); + cond.notify_one(); + } +private: + bool is_stopping() const { + return stopping; + } + bool stopping = false; + std::mutex mutex; + std::condition_variable cond; + std::deque pending; +}; + /// an engine for scheduling non-seastar tasks from seastar fibers class ThreadPool { + size_t n_threads; std::atomic stopping = false; - std::mutex mutex; - std::condition_variable cond; std::vector threads; seastar::sharded submit_queue; const size_t queue_size; - boost::lockfree::queue pending; + std::vector pending_queues; - void loop(std::chrono::milliseconds queue_max_wait); + void loop(std::chrono::milliseconds queue_max_wait, size_t shard); bool is_stopping() const { return stopping.load(std::memory_order_relaxed); } @@ -106,20 +140,22 @@ public: ~ThreadPool(); seastar::future<> start(); seastar::future<> stop(); + size_t size() { + return n_threads; + } template - auto submit(Func&& func, Args&&... args) { + auto submit(int shard, Func&& func, Args&&... args) { auto packaged = [func=std::move(func), args=std::forward_as_tuple(args...)] { return std::apply(std::move(func), std::move(args)); }; return seastar::with_gate(submit_queue.local().pending_tasks, - [packaged=std::move(packaged), this] { + [packaged=std::move(packaged), shard, this] { return local_free_slots().wait() - .then([packaged=std::move(packaged), this] { + .then([packaged=std::move(packaged), shard, this] { auto task = new Task{std::move(packaged)}; auto fut = task->get_future(); - pending.push(task); - cond.notify_one(); + pending_queues[shard].push_back(task); return fut.finally([task, this] { local_free_slots().signal(); delete task; @@ -127,6 +163,12 @@ public: }); }); } + + template + auto submit(Func&& func) { + return submit(::rand() % n_threads, + std::forward(func)); + } }; } // namespace crimson::os diff --git a/src/test/crimson/test_alienstore_thread_pool.cc b/src/test/crimson/test_alienstore_thread_pool.cc index 82b98abbb60c8..7cfffec757b74 100644 --- a/src/test/crimson/test_alienstore_thread_pool.cc +++ b/src/test/crimson/test_alienstore_thread_pool.cc @@ -15,7 +15,7 @@ seastar::future<> test_accumulate(ThreadPool& tp) { static constexpr auto N = 5; static constexpr auto M = 1; auto slow_plus = [&tp](int i) { - return tp.submit([=] { + return tp.submit(::rand() % 2, [=] { std::this_thread::sleep_for(10ns); return i + M; }); @@ -30,7 +30,7 @@ seastar::future<> test_accumulate(ThreadPool& tp) { } seastar::future<> test_void_return(ThreadPool& tp) { - return tp.submit([=] { + return tp.submit(::rand() % 2, [=] { std::this_thread::sleep_for(10ns); }); } @@ -50,7 +50,7 @@ int main(int argc, char** argv) .then([conf_file_list] { return local_conf().parse_config_files(conf_file_list); }).then([] { - return seastar::do_with(std::make_unique(2, 128, 0), + return seastar::do_with(std::make_unique(2, 128, (std::vector){0}), [](auto& tp) { return tp->start().then([&tp] { return test_accumulate(*tp); -- 2.39.5