return seastar::do_with(std::vector<ghobject_t>(), ghobject_t(),
[=] (auto &objects, auto &next) {
objects.reserve(limit);
- return tp->submit([=, &objects, &next] {
+ return tp->submit(ch->get_cid().hash_to_shard(tp->size()),
+ [=, &objects, &next] {
auto c = static_cast<AlienCollection*>(ch.get());
return store->collection_list(c->collection, start, end,
store->get_ideal_list_max(),
{
logger().debug("{}", __func__);
return seastar::do_with(ceph::bufferlist{}, [=] (auto &bl) {
- return tp->submit([=, &bl] {
+ return tp->submit(ch->get_cid().hash_to_shard(tp->size()), [=, &bl] {
auto c = static_cast<AlienCollection*>(ch.get());
return store->read(c->collection, oid, offset, len, bl, op_flags);
}).then([&bl] (int r) -> read_errorator::future<ceph::bufferlist> {
logger().debug("{}", __func__);
return seastar::do_with(ceph::bufferlist{},
[this, ch, oid, &m, op_flags](auto& bl) {
- return tp->submit([this, ch, oid, &m, op_flags, &bl] {
+ return tp->submit(ch->get_cid().hash_to_shard(tp->size()),
+ [this, ch, oid, &m, op_flags, &bl] {
auto c = static_cast<AlienCollection*>(ch.get());
return store->readv(c->collection, oid, m, bl, op_flags);
}).then([&bl](int r) -> read_errorator::future<ceph::bufferlist> {
{
logger().debug("{}", __func__);
return seastar::do_with(ceph::bufferptr{}, [=] (auto &value) {
- return tp->submit([=, &value] {
+ return tp->submit(ch->get_cid().hash_to_shard(tp->size()), [=, &value] {
auto c =static_cast<AlienCollection*>(ch.get());
return store->getattr(c->collection, oid,
static_cast<std::string>(name).c_str(), value);
{
logger().debug("{}", __func__);
return seastar::do_with(attrs_t{}, [=] (auto &aset) {
- return tp->submit([=, &aset] {
+ return tp->submit(ch->get_cid().hash_to_shard(tp->size()), [=, &aset] {
auto c = static_cast<AlienCollection*>(ch.get());
return store->getattrs(c->collection, oid,
reinterpret_cast<map<string,bufferptr>&>(aset));
{
logger().debug("{}", __func__);
return seastar::do_with(omap_values_t{}, [=] (auto &values) {
- return tp->submit([=, &values] {
+ return tp->submit(ch->get_cid().hash_to_shard(tp->size()), [=, &values] {
auto c = static_cast<AlienCollection*>(ch.get());
return store->omap_get_values(c->collection, oid, keys,
reinterpret_cast<map<string, bufferlist>*>(&values));
{
logger().debug("{} with_start", __func__);
return seastar::do_with(omap_values_t{}, [=] (auto &values) {
- return tp->submit([=, &values] {
+ return tp->submit(ch->get_cid().hash_to_shard(tp->size()), [=, &values] {
auto c = static_cast<AlienCollection*>(ch.get());
return store->omap_get_values(c->collection, oid, start,
reinterpret_cast<map<string, bufferlist>*>(&values));
return alien_coll->with_lock([this, ch, id, &txn, &done] {
Context *crimson_wrapper =
ceph::os::Transaction::collect_all_contexts(txn);
- return tp->submit([this, ch, id, crimson_wrapper, &txn, &done] {
+ return tp->submit(ch->get_cid().hash_to_shard(tp->size()),
+ [this, ch, id, crimson_wrapper, &txn, &done] {
txn.register_on_commit(new OnCommit(id, done, crimson_wrapper, txn));
auto c = static_cast<AlienCollection*>(ch.get());
return store->queue_transaction(c->collection, std::move(txn));
const ghobject_t& oid)
{
return seastar::do_with((struct stat){}, [this, ch, oid](auto& st) {
- return tp->submit([this, ch, oid, &st] {
+ return tp->submit(ch->get_cid().hash_to_shard(tp->size()), [this, ch, oid, &st] {
auto c = static_cast<AlienCollection*>(ch.get());
store->stat(c->collection, oid, &st);
return st;
-> read_errorator::future<ceph::bufferlist>
{
return seastar::do_with(ceph::bufferlist(), [=](auto& bl) {
- return tp->submit([=, &bl] {
+ return tp->submit(ch->get_cid().hash_to_shard(tp->size()), [=, &bl] {
auto c = static_cast<AlienCollection*>(ch.get());
return store->omap_get_header(c->collection, oid, &bl);
}).then([&bl] (int r) -> read_errorator::future<ceph::bufferlist> {
uint64_t len)
{
return seastar::do_with(std::map<uint64_t, uint64_t>(), [=](auto& destmap) {
- return tp->submit([=, &destmap] {
+ return tp->submit(ch->get_cid().hash_to_shard(tp->size()), [=, &destmap] {
auto c = static_cast<AlienCollection*>(ch.get());
return store->fiemap(c->collection, oid, off, len, destmap);
}).then([&destmap] (int i) {
CollectionRef ch,
const ghobject_t& oid)
{
- return tp->submit([=] {
+ return tp->submit(ch->get_cid().hash_to_shard(tp->size()),
+ [this, ch, oid] {
auto c = static_cast<AlienCollection*>(ch.get());
auto iter = store->get_omap_iterator(c->collection, oid);
return FuturizedStore::OmapIteratorRef(
new AlienStore::AlienOmapIterator(iter,
- this));
+ this,
+ ch));
});
}
// needs further optimization.
seastar::future<> AlienStore::AlienOmapIterator::seek_to_first()
{
- return store->tp->submit([=] {
+ return store->tp->submit(ch->get_cid().hash_to_shard(store->tp->size()),
+ [this] {
return iter->seek_to_first();
}).then([] (int r) {
assert(r == 0);
seastar::future<> AlienStore::AlienOmapIterator::upper_bound(
const std::string& after)
{
- return store->tp->submit([this, after] {
+ return store->tp->submit(ch->get_cid().hash_to_shard(store->tp->size()),
+ [this, after] {
return iter->upper_bound(after);
}).then([] (int r) {
assert(r == 0);
seastar::future<> AlienStore::AlienOmapIterator::lower_bound(
const std::string& to)
{
- return store->tp->submit([this, to] {
+ return store->tp->submit(ch->get_cid().hash_to_shard(store->tp->size()),
+ [this, to] {
return iter->lower_bound(to);
}).then([] (int r) {
assert(r == 0);
seastar::future<> AlienStore::AlienOmapIterator::next()
{
- return store->tp->submit([this] {
+ return store->tp->submit(ch->get_cid().hash_to_shard(store->tp->size()),
+ [this] {
return iter->next();
}).then([] (int r) {
assert(r == 0);
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
+// vim: ts=8 sw=2 smarttab expandtab
#pragma once
#include <atomic>
}
};
+struct ShardedWorkQueue {
+public:
+ WorkItem* pop_front(std::chrono::milliseconds& queue_max_wait) {
+ WorkItem* work_item = nullptr;
+ std::unique_lock lock{mutex};
+ cond.wait_for(lock, queue_max_wait,
+ [this, &work_item] {
+ bool empty = true;
+ if (!pending.empty()) {
+ empty = false;
+ work_item = pending.front();
+ pending.pop_front();
+ }
+ return !empty || is_stopping();
+ });
+ return work_item;
+ }
+ void stop() {
+ stopping = true;
+ cond.notify_all();
+ }
+ void push_back(WorkItem* work_item) {
+ pending.push_back(work_item);
+ cond.notify_one();
+ }
+private:
+ bool is_stopping() const {
+ return stopping;
+ }
+ bool stopping = false;
+ std::mutex mutex;
+ std::condition_variable cond;
+ std::deque<WorkItem*> pending;
+};
+
/// an engine for scheduling non-seastar tasks from seastar fibers
class ThreadPool {
+ size_t n_threads;
std::atomic<bool> stopping = false;
- std::mutex mutex;
- std::condition_variable cond;
std::vector<std::thread> threads;
seastar::sharded<SubmitQueue> submit_queue;
const size_t queue_size;
- boost::lockfree::queue<WorkItem*> pending;
+ std::vector<ShardedWorkQueue> pending_queues;
- void loop(std::chrono::milliseconds queue_max_wait);
+ void loop(std::chrono::milliseconds queue_max_wait, size_t shard);
bool is_stopping() const {
return stopping.load(std::memory_order_relaxed);
}
~ThreadPool();
seastar::future<> start();
seastar::future<> stop();
+ size_t size() {
+ return n_threads;
+ }
template<typename Func, typename...Args>
- auto submit(Func&& func, Args&&... args) {
+ auto submit(int shard, Func&& func, Args&&... args) {
auto packaged = [func=std::move(func),
args=std::forward_as_tuple(args...)] {
return std::apply(std::move(func), std::move(args));
};
return seastar::with_gate(submit_queue.local().pending_tasks,
- [packaged=std::move(packaged), this] {
+ [packaged=std::move(packaged), shard, this] {
return local_free_slots().wait()
- .then([packaged=std::move(packaged), this] {
+ .then([packaged=std::move(packaged), shard, this] {
auto task = new Task{std::move(packaged)};
auto fut = task->get_future();
- pending.push(task);
- cond.notify_one();
+ pending_queues[shard].push_back(task);
return fut.finally([task, this] {
local_free_slots().signal();
delete task;
});
});
}
+
+ template<typename Func>
+ auto submit(Func&& func) {
+ return submit(::rand() % n_threads,
+ std::forward<Func>(func));
+ }
};
} // namespace crimson::os