From: Chunmei Liu Date: Wed, 1 Oct 2025 22:33:12 +0000 (+0000) Subject: crimson/os/alienstore: support multiple store shards on each reactor X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=cac81224c7e1f3145503dbe260c0d5b589240f2f;p=ceph-ci.git crimson/os/alienstore: support multiple store shards on each reactor Signed-off-by: Chunmei Liu --- diff --git a/src/crimson/os/alienstore/alien_store.cc b/src/crimson/os/alienstore/alien_store.cc index 65b8f061e64..13e38f96f7d 100644 --- a/src/crimson/os/alienstore/alien_store.cc +++ b/src/crimson/os/alienstore/alien_store.cc @@ -76,7 +76,7 @@ AlienStore::AlienStore(const std::string& type, : type(type), path{path}, values(values), - op_gates() + main_op_gates() { } @@ -84,7 +84,7 @@ AlienStore::~AlienStore() { } -seastar::future<> AlienStore::start() +seastar::future AlienStore::start() { cct = std::make_unique( CEPH_ENTITY_TYPE_OSD, @@ -98,8 +98,8 @@ seastar::future<> AlienStore::start() cct->_conf.set_config_values(values); cct->_log->start(); - store = ObjectStore::create(cct.get(), type, path); - if (!store) { + main_store = ObjectStore::create(cct.get(), type, path); + if (!main_store) { ceph_abort_msgf("unsupported objectstore type: %s", type.c_str()); } /* @@ -118,33 +118,43 @@ seastar::future<> AlienStore::start() const auto num_threads = get_conf("crimson_bluestore_num_threads"); - tp = std::make_unique(num_threads, 128, alien_thread_cpu_cores); - return tp->start(); + main_tp = std::make_unique(num_threads, 128, alien_thread_cpu_cores); + return main_tp->start().then([this]() { + return shard_stores.start(main_tp.get(), + main_store.get(), + &main_op_gates, + &main_coll_map, + std::ref(main_coll_map_lock)); + }).then([] () { + return seastar::make_ready_future(seastar::smp::count); + }); } seastar::future<> AlienStore::stop() { - if (!tp) { + if (!main_tp) { // not really started yet return seastar::now(); } - return tp->submit([this] { - store.reset(); - cct.reset(); - g_ceph_context = nullptr; - - }).then([this] { - return tp->stop(); + return shard_stores.stop() + .then([this]() { + return main_tp->submit([this] { + main_store.reset(); + cct.reset(); + g_ceph_context = nullptr; + }).then([this] { + return main_tp->stop(); + }); }); } -AlienStore::base_errorator::future -AlienStore::exists( +AlienStore::Shard::base_errorator::future +AlienStore::Shard::exists( CollectionRef ch, const ghobject_t& oid, uint32_t op_flags) { - return op_gates.simple_dispatch("exists", [=, this] { + return (*op_gates).simple_dispatch("exists", [=, this] { return tp->submit(ch->get_cid().hash_to_shard(tp->size()), [=, this] { auto c = static_cast(ch.get()); return store->exists(c->collection, oid); @@ -155,9 +165,9 @@ AlienStore::exists( AlienStore::mount_ertr::future<> AlienStore::mount() { logger().debug("{}", __func__); - assert(tp); - return tp->submit([this] { - return store->mount(); + assert(main_tp); + return main_tp->submit([this] { + return main_store->mount(); }).then([] (const int r) -> mount_ertr::future<> { if (r != 0) { return crimson::stateful_ec{ @@ -171,21 +181,21 @@ AlienStore::mount_ertr::future<> AlienStore::mount() seastar::future<> AlienStore::umount() { logger().info("{}", __func__); - if (!tp) { - // not really started yet + if (!main_tp) { return seastar::now(); } - return op_gates.close_all().then([this] { - return tp->submit([this] { + + return main_op_gates.close_all().then([this] { + return main_tp->submit([this] { { - std::lock_guard l(coll_map_lock); - for (auto [cid, ch]: coll_map) { - static_cast(ch.get())->collection.reset(); - } - coll_map.clear(); + std::lock_guard l(main_coll_map_lock); + for (auto [cid, ch]: main_coll_map) { + static_cast(ch.get())->collection.reset(); + } + main_coll_map.clear(); } - return store->umount(); - }).then([] (int r) { + return main_store->umount(); + }).then([](int r) { assert(r == 0); return seastar::now(); }); @@ -195,10 +205,10 @@ seastar::future<> AlienStore::umount() AlienStore::mkfs_ertr::future<> AlienStore::mkfs(uuid_d osd_fsid) { logger().debug("{}", __func__); - store->set_fsid(osd_fsid); - assert(tp); - return tp->submit([this] { - return store->mkfs(); + main_store->set_fsid(osd_fsid); + assert(main_tp); + return main_tp->submit([this] { + return main_store->mkfs(); }).then([] (int r) -> mkfs_ertr::future<> { if (r != 0) { return crimson::stateful_ec{ @@ -210,7 +220,7 @@ AlienStore::mkfs_ertr::future<> AlienStore::mkfs(uuid_d osd_fsid) } seastar::future, ghobject_t>> -AlienStore::list_objects(CollectionRef ch, +AlienStore::Shard::list_objects(CollectionRef ch, const ghobject_t& start, const ghobject_t& end, uint64_t limit, @@ -235,7 +245,7 @@ AlienStore::list_objects(CollectionRef ch, }); } -seastar::future AlienStore::create_new_collection(const coll_t& cid) +seastar::future AlienStore::Shard::create_new_collection(const coll_t& cid) { logger().debug("{}", __func__); assert(tp); @@ -245,7 +255,7 @@ seastar::future AlienStore::create_new_collection(const coll_t& c }); } -seastar::future AlienStore::open_collection(const coll_t& cid) +seastar::future AlienStore::Shard::open_collection(const coll_t& cid) { logger().debug("{}", __func__); assert(tp); @@ -260,6 +270,10 @@ seastar::future AlienStore::open_collection(const coll_t& cid) } seastar::future> AlienStore::list_collections() +{ + return shard_stores.local().mshard_stores->shard_list_collections(); +} +seastar::future> AlienStore::Shard::shard_list_collections() { logger().debug("{}", __func__); assert(tp); @@ -273,13 +287,13 @@ seastar::future> AlienStore::list_collections() ret.resize(ls.size()); std::transform( ls.begin(), ls.end(), ret.begin(), - [](auto p) { return std::make_pair(p, NULL_CORE); }); + [](auto p) { return std::make_pair(p, std::make_pair(NULL_CORE, NULL_STORE_INDEX)); }); return seastar::make_ready_future>(std::move(ret)); }); }); } -seastar::future<> AlienStore::set_collection_opts(CollectionRef ch, +seastar::future<> AlienStore::Shard::set_collection_opts(CollectionRef ch, const pool_opts_t& opts) { logger().debug("{}", __func__); @@ -294,8 +308,8 @@ seastar::future<> AlienStore::set_collection_opts(CollectionRef ch, }); } -AlienStore::read_errorator::future -AlienStore::read(CollectionRef ch, +AlienStore::Shard::read_errorator::future +AlienStore::Shard::read(CollectionRef ch, const ghobject_t& oid, uint64_t offset, size_t len, @@ -320,8 +334,8 @@ AlienStore::read(CollectionRef ch, }); } -AlienStore::read_errorator::future -AlienStore::readv(CollectionRef ch, +AlienStore::Shard::read_errorator::future +AlienStore::Shard::readv(CollectionRef ch, const ghobject_t& oid, interval_set& m, uint32_t op_flags) @@ -347,8 +361,8 @@ AlienStore::readv(CollectionRef ch, }); } -AlienStore::get_attr_errorator::future -AlienStore::get_attr(CollectionRef ch, +AlienStore::Shard::get_attr_errorator::future +AlienStore::Shard::get_attr(CollectionRef ch, const ghobject_t& oid, std::string_view name, uint32_t op_flags) const @@ -377,8 +391,8 @@ AlienStore::get_attr(CollectionRef ch, }); } -AlienStore::get_attrs_ertr::future -AlienStore::get_attrs(CollectionRef ch, +AlienStore::Shard::get_attrs_ertr::future +AlienStore::Shard::get_attrs(CollectionRef ch, const ghobject_t& oid, uint32_t op_flags) { @@ -399,7 +413,7 @@ AlienStore::get_attrs(CollectionRef ch, }); } -auto AlienStore::omap_get_values(CollectionRef ch, +auto AlienStore::Shard::omap_get_values(CollectionRef ch, const ghobject_t& oid, const set& keys, uint32_t op_flags) @@ -424,8 +438,8 @@ auto AlienStore::omap_get_values(CollectionRef ch, }); } -AlienStore::read_errorator::future -AlienStore::omap_iterate(CollectionRef ch, +AlienStore::Shard::read_errorator::future +AlienStore::Shard::omap_iterate(CollectionRef ch, const ghobject_t &oid, ObjectStore::omap_iter_seek_t start_from, omap_iterate_cb_t callback, @@ -453,7 +467,7 @@ AlienStore::omap_iterate(CollectionRef ch, }); } -seastar::future<> AlienStore::do_transaction_no_callbacks( +seastar::future<> AlienStore::Shard::do_transaction_no_callbacks( CollectionRef ch, ceph::os::Transaction&& txn) { @@ -481,22 +495,22 @@ seastar::future<> AlienStore::do_transaction_no_callbacks( }); } -seastar::future<> AlienStore::inject_data_error(const ghobject_t& o) +seastar::future<> AlienStore::Shard::inject_data_error(const ghobject_t& o) { logger().debug("{}", __func__); assert(tp); - return op_gates.simple_dispatch("inject_data_error", [=, this] { + return (*op_gates).simple_dispatch("inject_data_error", [=, this] { return tp->submit([o, this] { return store->inject_data_error(o); }); }); } -seastar::future<> AlienStore::inject_mdata_error(const ghobject_t& o) +seastar::future<> AlienStore::Shard::inject_mdata_error(const ghobject_t& o) { logger().debug("{}", __func__); assert(tp); - return op_gates.simple_dispatch("inject_mdata_error", [=, this] { + return (*op_gates).simple_dispatch("inject_mdata_error", [=, this] { return tp->submit([o, this] { return store->inject_mdata_error(o); }); @@ -507,10 +521,10 @@ seastar::future<> AlienStore::write_meta(const std::string& key, const std::string& value) { logger().debug("{}", __func__); - assert(tp); - return op_gates.simple_dispatch("write_meta", [=, this] { - return tp->submit([=, this] { - return store->write_meta(key, value); + assert(main_tp); + return main_op_gates.simple_dispatch("write_meta", [=, this] { + return main_tp->submit([=, this] { + return main_store->write_meta(key, value); }).then([] (int r) { assert(r == 0); return seastar::make_ready_future<>(); @@ -522,11 +536,11 @@ seastar::future> AlienStore::read_meta(const std::string& key) { logger().debug("{}", __func__); - assert(tp); - return op_gates.simple_dispatch("read_meta", [this, key] { - return tp->submit([key, this] { + assert(main_tp); + return main_op_gates.simple_dispatch("read_meta", [this, key] { + return main_tp->submit([key, this] { std::string value; - int r = store->read_meta(key, &value); + int r = main_store->read_meta(key, &value); if (r > 0) { value.resize(r); boost::algorithm::trim_right_if(value, @@ -545,10 +559,14 @@ AlienStore::read_meta(const std::string& key) uuid_d AlienStore::get_fsid() const { logger().debug("{}", __func__); - return store->get_fsid(); + return main_store->get_fsid(); } seastar::future AlienStore::stat() const +{ + return shard_stores.local().mshard_stores->shard_stat(); +} +seastar::future AlienStore::Shard::shard_stat() { logger().info("{}", __func__); assert(tp); @@ -563,6 +581,10 @@ seastar::future AlienStore::stat() const } seastar::future AlienStore::pool_statfs(int64_t pool_id) const +{ + return shard_stores.local().mshard_stores->shard_pool_statfs(pool_id); +} +seastar::future AlienStore::Shard::shard_pool_statfs(int64_t pool_id) { logger().info("{}", __func__); assert(tp); @@ -577,13 +599,13 @@ seastar::future AlienStore::pool_statfs(int64_t pool_id) const }); } -unsigned AlienStore::get_max_attr_name_length() const +unsigned AlienStore::Shard::get_max_attr_name_length() const { logger().info("{}", __func__); return 256; } -seastar::future AlienStore::stat( +seastar::future AlienStore::Shard::stat( CollectionRef ch, const ghobject_t& oid, uint32_t op_flags) @@ -601,17 +623,17 @@ seastar::future AlienStore::stat( seastar::future AlienStore::get_default_device_class() { logger().debug("{}", __func__); - assert(tp); - return op_gates.simple_dispatch("get_default_device_class", [=, this] { - return tp->submit([=, this] { - return store->get_default_device_class(); + assert(main_tp); + return main_op_gates.simple_dispatch("get_default_device_class", [=, this] { + return main_tp->submit([=, this] { + return main_store->get_default_device_class(); }).then([] (std::string device_class) { return seastar::make_ready_future(device_class); }); }); } -auto AlienStore::omap_get_header(CollectionRef ch, +auto AlienStore::Shard::omap_get_header(CollectionRef ch, const ghobject_t& oid, uint32_t op_flags) -> get_attr_errorator::future @@ -635,7 +657,7 @@ auto AlienStore::omap_get_header(CollectionRef ch, }); } -AlienStore::read_errorator::future> AlienStore::fiemap( +AlienStore::Shard::read_errorator::future> AlienStore::Shard::fiemap( CollectionRef ch, const ghobject_t& oid, uint64_t off, @@ -659,13 +681,13 @@ AlienStore::read_errorator::future> AlienStore::fie }); } -CollectionRef AlienStore::get_alien_coll_ref(ObjectStore::CollectionHandle c) { +CollectionRef AlienStore::Shard::get_alien_coll_ref(ObjectStore::CollectionHandle c) { std::lock_guard l(coll_map_lock); CollectionRef ch; - auto cp = coll_map.find(c->cid); - if (cp == coll_map.end()) { + auto cp = coll_map->find(c->cid); + if (cp == coll_map->end()) { ch = new AlienCollection(c); - coll_map[c->cid] = ch; + (*coll_map)[c->cid] = ch; } else { ch = cp->second; auto ach = static_cast(ch.get()); diff --git a/src/crimson/os/alienstore/alien_store.h b/src/crimson/os/alienstore/alien_store.h index 613501fffe0..6c48c42ef55 100644 --- a/src/crimson/os/alienstore/alien_store.h +++ b/src/crimson/os/alienstore/alien_store.h @@ -5,6 +5,7 @@ #include #include +#include #include "common/ceph_context.h" #include "os/ObjectStore.h" @@ -21,129 +22,187 @@ class Transaction; namespace crimson::os { using coll_core_t = FuturizedStore::coll_core_t; -class AlienStore final : public FuturizedStore, - public FuturizedStore::Shard { +class AlienStore final : public FuturizedStore { + class Shard : public FuturizedStore::Shard { + public: + Shard(crimson::os::ThreadPool* _tp, + ObjectStore* _store, + crimson::common::gate_per_shard* _op_gates, + std::unordered_map* _coll_map, + std::mutex& _coll_map_lock) + : tp(_tp), + store(_store), + op_gates(_op_gates), + coll_map(_coll_map), + coll_map_lock(_coll_map_lock) {} + + ~Shard() = default; + + base_errorator::future exists( + CollectionRef c, + const ghobject_t& oid, + uint32_t op_flags = 0) final; + + read_errorator::future read(CollectionRef c, + const ghobject_t& oid, + uint64_t offset, + size_t len, + uint32_t op_flags = 0) final; + read_errorator::future readv(CollectionRef c, + const ghobject_t& oid, + interval_set& m, + uint32_t op_flags = 0) final; + + get_attr_errorator::future get_attr( + CollectionRef c, + const ghobject_t& oid, + std::string_view name, + uint32_t op_flags = 0) const final; + get_attrs_ertr::future get_attrs( + CollectionRef c, + const ghobject_t& oid, + uint32_t op_flags = 0) final; + + read_errorator::future omap_get_values( + CollectionRef c, + const ghobject_t& oid, + const omap_keys_t& keys, + uint32_t op_flags = 0) final; + get_attr_errorator::future omap_get_header( + CollectionRef, + const ghobject_t&, + uint32_t) final; + read_errorator::future> fiemap( + CollectionRef, + const ghobject_t&, + uint64_t off, + uint64_t len, + uint32_t op_flags) final; + + seastar::future, ghobject_t>> list_objects( + CollectionRef c, + const ghobject_t& start, + const ghobject_t& end, + uint64_t limit, + uint32_t op_flags = 0) const final; + + read_errorator::future omap_iterate( + CollectionRef c, + const ghobject_t &oid, + ObjectStore::omap_iter_seek_t start_from, + omap_iterate_cb_t callback, + uint32_t op_flags = 0) final; + + seastar::future create_new_collection(const coll_t& cid) final; + seastar::future open_collection(const coll_t& cid) final; + seastar::future> shard_list_collections(); + seastar::future<> set_collection_opts(CollectionRef c, + const pool_opts_t& opts) final; + seastar::future shard_stat(); + seastar::future shard_pool_statfs(int64_t pool_id); + seastar::future stat( + CollectionRef, + const ghobject_t&, + uint32_t op_flags = 0) final; + + seastar::future<> do_transaction_no_callbacks( + CollectionRef c, + ceph::os::Transaction&& txn) final; + + // error injection + seastar::future<> inject_data_error(const ghobject_t& o) final; + seastar::future<> inject_mdata_error(const ghobject_t& o) final; + unsigned get_max_attr_name_length() const final; + + private: + CollectionRef get_alien_coll_ref(ObjectStore::CollectionHandle c); + + template + auto do_with_op_gate(Args&&... args) const { + return (*op_gates).simple_dispatch("AlienStore::do_with_op_gate", + // perfect forwarding in lambda's closure isn't available in C++17 + // using tuple as workaround; see: https://stackoverflow.com/a/49902823 + [args = std::make_tuple(std::forward(args)...)] () mutable { + return std::apply([] (auto&&... args) { + return seastar::do_with(std::forward(args)...); + }, std::move(args)); + }); + } + crimson::os::ThreadPool* tp = nullptr; //for each shard + ObjectStore* store = nullptr; //for each shard + crimson::common::gate_per_shard* op_gates = nullptr; //for per shard + std::unordered_map* coll_map = nullptr; // for per shard + std::mutex& coll_map_lock; + }; public: AlienStore(const std::string& type, const std::string& path, const ConfigValues& values); ~AlienStore() final; - seastar::future<> start() final; + seastar::future start() final; seastar::future<> stop() final; - mount_ertr::future<> mount() final; - seastar::future<> umount() final; - base_errorator::future exists( - CollectionRef c, - const ghobject_t& oid, - uint32_t op_flags = 0) final; mkfs_ertr::future<> mkfs(uuid_d new_osd_fsid) final; - read_errorator::future read(CollectionRef c, - const ghobject_t& oid, - uint64_t offset, - size_t len, - uint32_t op_flags = 0) final; - read_errorator::future readv(CollectionRef c, - const ghobject_t& oid, - interval_set& m, - uint32_t op_flags = 0) final; - - - get_attr_errorator::future get_attr( - CollectionRef c, - const ghobject_t& oid, - std::string_view name, - uint32_t op_flags = 0) const final; - get_attrs_ertr::future get_attrs( - CollectionRef c, - const ghobject_t& oid, - uint32_t op_flags = 0) final; - - read_errorator::future omap_get_values( - CollectionRef c, - const ghobject_t& oid, - const omap_keys_t& keys, - uint32_t op_flags = 0) final; - - seastar::future, ghobject_t>> list_objects( - CollectionRef c, - const ghobject_t& start, - const ghobject_t& end, - uint64_t limit, - uint32_t op_flags = 0) const final; - - read_errorator::future omap_iterate( - CollectionRef c, - const ghobject_t &oid, - ObjectStore::omap_iter_seek_t start_from, - omap_iterate_cb_t callback, - uint32_t op_flags = 0) final; - - seastar::future create_new_collection(const coll_t& cid) final; - seastar::future open_collection(const coll_t& cid) final; - seastar::future> list_collections() final; - seastar::future<> set_collection_opts(CollectionRef c, - const pool_opts_t& opts) final; - - seastar::future<> do_transaction_no_callbacks( - CollectionRef c, - ceph::os::Transaction&& txn) final; - - // error injection - seastar::future<> inject_data_error(const ghobject_t& o) final; - seastar::future<> inject_mdata_error(const ghobject_t& o) final; + mount_ertr::future<> mount() final; + seastar::future<> umount() final; seastar::future<> write_meta(const std::string& key, const std::string& value) final; seastar::future> read_meta( const std::string& key) final; uuid_d get_fsid() const final; + seastar::future stat() const final; seastar::future pool_statfs(int64_t pool_id) const final; - unsigned get_max_attr_name_length() const final; - seastar::future stat( - CollectionRef, - const ghobject_t&, - uint32_t op_flags = 0) final; + seastar::future get_default_device_class() final; - get_attr_errorator::future omap_get_header( - CollectionRef, - const ghobject_t&, - uint32_t) final; - read_errorator::future> fiemap( - CollectionRef, - const ghobject_t&, - uint64_t off, - uint64_t len, - uint32_t op_flags) final; - - FuturizedStore::Shard& get_sharded_store() final { - return *this; - } -private: + seastar::future> list_collections() final; + + FuturizedStore::StoreShardRef get_sharded_store(unsigned int store_index = 0) final { + return make_local_shared_foreign( + seastar::make_foreign(seastar::static_pointer_cast( + shard_stores.local().mshard_stores))); + } - template - auto do_with_op_gate(Args&&... args) const { - return op_gates.simple_dispatch("AlienStore::do_with_op_gate", - // perfect forwarding in lambda's closure isn't available in C++17 - // using tuple as workaround; see: https://stackoverflow.com/a/49902823 - [args = std::make_tuple(std::forward(args)...)] () mutable { - return std::apply([] (auto&&... args) { - return seastar::do_with(std::forward(args)...); - }, std::move(args)); - }); + std::vector get_sharded_stores() final { + std::vector ret; + ret.emplace_back(make_local_shared_foreign( + seastar::make_foreign(seastar::static_pointer_cast( + shard_stores.local().mshard_stores)))); + return ret; } - mutable std::unique_ptr tp; +private: + mutable std::unique_ptr main_tp; const std::string type; const std::string path; const ConfigValues values; uint64_t used_bytes = 0; - std::unique_ptr store; + std::unique_ptr main_store; std::unique_ptr cct; - mutable crimson::common::gate_per_shard op_gates; + mutable crimson::common::gate_per_shard main_op_gates; + + class MultiShardStores { + public: + seastar::shared_ptr mshard_stores; + + public: + MultiShardStores(crimson::os::ThreadPool* _tp, + ObjectStore* _store, + crimson::common::gate_per_shard* _op_gates, + std::unordered_map* _coll_map, + std::mutex& _coll_map_lock) + { + mshard_stores = seastar::make_shared( + _tp, _store, _op_gates, _coll_map, _coll_map_lock); + } + + ~MultiShardStores() {} + }; + + seastar::sharded shard_stores; /** * coll_map @@ -165,8 +224,9 @@ private: * coll_map is accessed exclusively from alien threadpool threads under the * coll_map_lock. */ - std::mutex coll_map_lock; - std::unordered_map coll_map; - CollectionRef get_alien_coll_ref(ObjectStore::CollectionHandle c); + std::mutex main_coll_map_lock; + std::unordered_map main_coll_map; + }; + }