From 7d336c615d2ce0a75c8b3c9e4149ddbd349512ef Mon Sep 17 00:00:00 2001 From: chunmei liu Date: Tue, 15 Jul 2025 03:27:16 -0700 Subject: [PATCH] crimson/os/cyanstore: create multiple store shards on each reactor note: src/stop.sh should wait enought time before kill the crimson-osd in case cyanstore can't write meta data to disk. Signed-off-by: chunmei liu --- src/crimson/os/cyanstore/cyan_store.cc | 167 +++++++++++++++++++++---- src/crimson/os/cyanstore/cyan_store.h | 97 +++++++++----- 2 files changed, 213 insertions(+), 51 deletions(-) diff --git a/src/crimson/os/cyanstore/cyan_store.cc b/src/crimson/os/cyanstore/cyan_store.cc index 9c3f5d6551c..a8a6ba43d86 100644 --- a/src/crimson/os/cyanstore/cyan_store.cc +++ b/src/crimson/os/cyanstore/cyan_store.cc @@ -54,19 +54,90 @@ private: }; }; +namespace fs = std::filesystem; +seastar::future<> CyanStore::get_shard_nums() +{ + store_shard_nums = 0; + for (const auto& entry : fs::directory_iterator(path)) { + const std::string filename = entry.path().filename().string(); + if (filename.rfind("collections", 0) == 0) { + store_shard_nums++; + } + } + if (store_shard_nums == 0) { + // If no collections files found, assume seastar::smp::count shards + store_shard_nums = seastar::smp::count; + } + return seastar::make_ready_future<>(); +} + +seastar::future CyanStore::start() +{ + ceph_assert(seastar::this_shard_id() == primary_core); + return get_shard_nums().then([this] { + auto num_shard_services = (store_shard_nums + seastar::smp::count - 1 ) / seastar::smp::count; + logger().info("store_shard_nums={} seastar::smp={}, num_shard_services={}", store_shard_nums, seastar::smp::count, num_shard_services); + return shard_stores.start(num_shard_services, path, store_shard_nums); + }).then([this] { + logger().debug("CyanStore started with {} shard stores", store_shard_nums); + return seastar::make_ready_future(store_shard_nums); + }); +} + +seastar::future<> CyanStore::stop() +{ + logger().debug("stopping shard stores"); + return shard_stores.stop(); +} + +CyanStore::mount_ertr::future<> CyanStore::mount() +{ + ceph_assert(seastar::this_shard_id() == primary_core); + return shard_stores.invoke_on_all([](auto &local_store) { + return seastar::do_for_each(local_store.mshard_stores, [](auto& mshard_store) { + return mshard_store->mount().handle_error( + crimson::ct_error::assert_all{ + "Invalid error in CyanStore::mount" + }); + }); + }); +} + +seastar::future<> CyanStore::umount() +{ + ceph_assert(seastar::this_shard_id() == primary_core); + return shard_stores.invoke_on_all([](auto &local_store) { + return seastar::do_for_each(local_store.mshard_stores, [](auto& mshard_store) { + return mshard_store->umount(); + }); + }); +} + seastar::future CyanStore::stat() const { ceph_assert(seastar::this_shard_id() == primary_core); logger().debug("{}", __func__); + return shard_stores.map_reduce0( - [](const CyanStore::Shard &local_store) { - return local_store.get_used_bytes(); + [](const auto& local_store) { + return seastar::map_reduce( + local_store.mshard_stores.begin(), + local_store.mshard_stores.end(), + [](const auto& mshard_store) { + return seastar::make_ready_future( + mshard_store->get_used_bytes() + ); + }, + uint64_t{0}, + std::plus() + ); }, - (uint64_t)0, + uint64_t{0}, std::plus() ).then([](uint64_t used_bytes) { store_statfs_t st; - st.total = crimson::common::local_conf().get_val("memstore_device_bytes"); + st.total = crimson::common::local_conf() + .get_val("memstore_device_bytes"); st.available = st.total - used_bytes; return seastar::make_ready_future(std::move(st)); }); @@ -109,15 +180,32 @@ CyanStore::mkfs_ertr::future<> CyanStore::mkfs(uuid_d new_osd_fsid) }).safe_then([this]{ return write_meta("type", "memstore"); }).safe_then([this] { - return shard_stores.invoke_on_all( - [](auto &local_store) { - return local_store.mkfs(); + return shard_stores.invoke_on_all([](auto &local_store) { + return seastar::do_for_each(local_store.mshard_stores, [](auto& mshard_store) { + return mshard_store->mkfs(); + }); }); }); } +CyanStore::Shard::Shard( + std::string path, + unsigned int store_shard_nums, + unsigned int store_index) + : path(path), + store_index(store_index) +{ + ceph_assert(store_index < store_shard_nums); + if(seastar::this_shard_id() + seastar::smp::count * store_index >= store_shard_nums) { + store_active = false; + } +} + seastar::future<> CyanStore::Shard::mkfs() { + if(!store_active) { + return seastar::now(); + } std::string fn = path + "/collections" + std::to_string(seastar::this_shard_id()); ceph::bufferlist bl; @@ -131,16 +219,29 @@ seastar::future> CyanStore::list_collections() { ceph_assert(seastar::this_shard_id() == primary_core); - return seastar::do_with(std::vector{}, [this](auto &collections) { - return shard_stores.map([](auto &local_store) { - return local_store.list_collections(); - }).then([&collections](std::vector> results) { - for (auto& colls : results) { - collections.insert(collections.end(), colls.begin(), colls.end()); + return shard_stores.map_reduce0( + [this](auto& local_store) { + // For each local store, collect all collections from its mshard_stores + return seastar::map_reduce( + local_store.mshard_stores.begin(), + local_store.mshard_stores.end(), + [](auto& mshard_store) { + return mshard_store->list_collections(); + }, + std::vector(), // Initial empty vector + [](auto&& merged, auto&& result) { // Reduction function + merged.insert(merged.end(), result.begin(), result.end()); + return std::move(merged); } - return seastar::make_ready_future>( - std::move(collections)); - }); + ); + }, + std::vector(), // Initial empty vector for final reduction + [](auto&& total, auto&& shard_result) { // Final reduction function + total.insert(total.end(), shard_result.begin(), shard_result.end()); + return std::move(total); + } + ).then([](auto all_collections) { + return seastar::make_ready_future>(std::move(all_collections)); }); } @@ -152,10 +253,13 @@ CyanStore::get_default_device_class() CyanStore::mount_ertr::future<> CyanStore::Shard::mount() { + if (!store_active) { + return mount_ertr::now(); + } static const char read_file_errmsg[]{"read_file"}; ceph::bufferlist bl; std::string fn = - path + "/collections" + std::to_string(seastar::this_shard_id()); + path + "/collections" + std::to_string(seastar::this_shard_id() + seastar::smp::count * store_index); std::string err; if (int r = bl.read_file(fn.c_str(), &err); r < 0) { return crimson::stateful_ec{ singleton_ec() }; @@ -167,7 +271,7 @@ CyanStore::mount_ertr::future<> CyanStore::Shard::mount() for (auto& coll : collections) { std::string fn = fmt::format("{}/{}{}", path, coll, - std::to_string(seastar::this_shard_id())); + std::to_string(seastar::this_shard_id() + seastar::smp::count * store_index)); ceph::bufferlist cbl; if (int r = cbl.read_file(fn.c_str(), &err); r < 0) { return crimson::stateful_ec{ singleton_ec() }; @@ -183,6 +287,9 @@ CyanStore::mount_ertr::future<> CyanStore::Shard::mount() seastar::future<> CyanStore::Shard::umount() { + if (!store_active) { + return seastar::now(); + } return seastar::do_with(std::set{}, [this](auto& collections) { return seastar::do_for_each(coll_map, [&collections, this](auto& coll) { auto& [col, ch] = coll; @@ -191,13 +298,13 @@ seastar::future<> CyanStore::Shard::umount() ceph_assert(ch); ch->encode(bl); std::string fn = fmt::format("{}/{}{}", path, col, - std::to_string(seastar::this_shard_id())); + std::to_string(seastar::this_shard_id()+ seastar::smp::count * store_index)); return crimson::write_file(std::move(bl), fn); }).then([&collections, this] { ceph::bufferlist bl; ceph::encode(collections, bl); std::string fn = fmt::format("{}/collections{}", - path, std::to_string(seastar::this_shard_id())); + path, std::to_string(seastar::this_shard_id()+ seastar::smp::count * store_index)); return crimson::write_file(std::move(bl), fn); }); }); @@ -211,6 +318,7 @@ CyanStore::Shard::list_objects( uint64_t limit, uint32_t op_flags) const { + assert(store_active); auto c = static_cast(ch.get()); logger().debug("{} {} {} {} {}", __func__, c->get_cid(), start, end, limit); @@ -234,6 +342,7 @@ CyanStore::Shard::list_objects( seastar::future CyanStore::Shard::create_new_collection(const coll_t& cid) { + assert(store_active); auto c = new Collection{cid}; new_coll_map[cid] = c; return seastar::make_ready_future(c); @@ -242,15 +351,19 @@ CyanStore::Shard::create_new_collection(const coll_t& cid) seastar::future CyanStore::Shard::open_collection(const coll_t& cid) { + assert(store_active); return seastar::make_ready_future(_get_collection(cid)); } seastar::future> CyanStore::Shard::list_collections() { + if (!store_active) { + return seastar::make_ready_future>(); + } std::vector collections; for (auto& coll : coll_map) { - collections.push_back(std::make_pair(coll.first, seastar::this_shard_id())); + collections.push_back(std::make_pair(coll.first, std::make_pair(seastar::this_shard_id(), store_index))); } return seastar::make_ready_future>(std::move(collections)); } @@ -261,6 +374,7 @@ CyanStore::Shard::exists( const ghobject_t &oid, uint32_t op_flags) { + assert(store_active); auto c = static_cast(ch.get()); if (!c->exists) { return base_errorator::make_ready_future(false); @@ -276,6 +390,7 @@ seastar::future<> CyanStore::Shard::set_collection_opts(CollectionRef ch, const pool_opts_t& opts) { + assert(store_active); auto c = static_cast(ch.get()); logger().debug("{} {}", __func__, c->get_cid()); c->pool_opts = opts; @@ -290,6 +405,7 @@ CyanStore::Shard::read( size_t len, uint32_t op_flags) { + assert(store_active); auto c = static_cast(ch.get()); logger().debug("{} {} {} {}~{}", __func__, c->get_cid(), oid, offset, len); @@ -317,6 +433,7 @@ CyanStore::Shard::readv( interval_set& m, uint32_t op_flags) { + assert(store_active); return seastar::do_with(ceph::bufferlist{}, [this, ch, oid, &m, op_flags](auto& bl) { return crimson::do_for_each(m, @@ -338,6 +455,7 @@ CyanStore::Shard::get_attr( std::string_view name, uint32_t op_flags) const { + assert(store_active); auto c = static_cast(ch.get()); logger().debug("{} {} {}", __func__, c->get_cid(), oid); @@ -358,6 +476,7 @@ CyanStore::Shard::get_attrs( const ghobject_t& oid, uint32_t op_flags) { + assert(store_active); auto c = static_cast(ch.get()); logger().debug("{} {} {}", __func__, c->get_cid(), oid); @@ -375,6 +494,7 @@ auto CyanStore::Shard::omap_get_values( uint32_t op_flags) -> read_errorator::future { + assert(store_active); auto c = static_cast(ch.get()); logger().debug("{} {} {}", __func__, c->get_cid(), oid); auto o = c->get_object(oid); @@ -398,6 +518,7 @@ auto CyanStore::Shard::omap_iterate( uint32_t op_flags) -> CyanStore::Shard::read_errorator::future { + assert(store_active); auto c = static_cast(ch.get()); logger().debug("{} {} {}", __func__, c->get_cid(), oid); auto o = c->get_object(oid); @@ -423,6 +544,7 @@ auto CyanStore::Shard::omap_get_header( uint32_t op_flags) -> CyanStore::Shard::get_attr_errorator::future { + assert(store_active); auto c = static_cast(ch.get()); auto o = c->get_object(oid); if (!o) { @@ -437,6 +559,7 @@ seastar::future<> CyanStore::Shard::do_transaction_no_callbacks( CollectionRef ch, ceph::os::Transaction&& t) { + assert(store_active); using ceph::os::Transaction; int r = 0; try { @@ -991,6 +1114,7 @@ CyanStore::Shard::fiemap( uint64_t len, uint32_t op_flags) { + assert(store_active); auto c = static_cast(ch.get()); ObjectRef o = c->get_object(oid); @@ -1007,6 +1131,7 @@ CyanStore::Shard::stat( const ghobject_t& oid, uint32_t op_flags) { + assert(store_active); auto c = static_cast(ch.get()); auto o = c->get_object(oid); if (!o) { diff --git a/src/crimson/os/cyanstore/cyan_store.h b/src/crimson/os/cyanstore/cyan_store.h index f42aa63a4fe..fd68da3ff74 100644 --- a/src/crimson/os/cyanstore/cyan_store.h +++ b/src/crimson/os/cyanstore/cyan_store.h @@ -12,6 +12,8 @@ #include #include #include +#include +#include #include "osd/osd_types.h" #include "include/uuid.h" @@ -29,8 +31,10 @@ class CyanStore final : public FuturizedStore { public: class Shard : public FuturizedStore::Shard { public: - Shard(std::string path) - :path(path){} + Shard(std::string path, + unsigned int store_shard_nums, + unsigned int store_index = 0); + ~Shard() = default; seastar::future stat( CollectionRef c, @@ -128,7 +132,19 @@ public: using coll_core_t = FuturizedStore::coll_core_t; seastar::future> list_collections(); - uint64_t get_used_bytes() const { return used_bytes; } + uint64_t get_used_bytes() const { + if (!store_active) { + return 0; + } + return used_bytes; + } + + unsigned int get_store_index() const { + return store_index; + } + bool get_status() const { + return store_active; + } private: int _remove(const coll_t& cid, const ghobject_t& oid); @@ -175,38 +191,20 @@ public: const std::string path; std::unordered_map> coll_map; std::map> new_coll_map; + unsigned int store_index; + bool store_active = true; }; CyanStore(const std::string& path); ~CyanStore() final; - seastar::future<> start() final { - ceph_assert(seastar::this_shard_id() == primary_core); - return shard_stores.start(path); - } + seastar::future start() final; - seastar::future<> stop() final { - ceph_assert(seastar::this_shard_id() == primary_core); - return shard_stores.stop(); - } + seastar::future<> stop() final; - mount_ertr::future<> mount() final { - ceph_assert(seastar::this_shard_id() == primary_core); - return shard_stores.invoke_on_all( - [](auto &local_store) { - return local_store.mount().handle_error( - crimson::stateful_ec::assert_failure( - fmt::format("error mounting cyanstore").c_str())); - }); - } + mount_ertr::future<> mount() final; - seastar::future<> umount() final { - ceph_assert(seastar::this_shard_id() == primary_core); - return shard_stores.invoke_on_all( - [](auto &local_store) { - return local_store.umount(); - }); - } + seastar::future<> umount() final; mkfs_ertr::future<> mkfs(uuid_d new_osd_fsid) final; @@ -219,8 +217,24 @@ public: seastar::future<> write_meta(const std::string& key, const std::string& value) final; - FuturizedStore::Shard& get_sharded_store() final{ - return shard_stores.local(); + FuturizedStore::StoreShardRef get_sharded_store(unsigned int store_index = 0) final { + assert(!shard_stores.local().mshard_stores.empty()); + assert(store_index < shard_stores.local().mshard_stores.size()); + assert(shard_stores.local().mshard_stores[store_index]->get_status() == true); + return make_local_shared_foreign( + seastar::make_foreign(seastar::static_pointer_cast( + shard_stores.local().mshard_stores[store_index]))); + } + std::vector get_sharded_stores() final{ + std::vector ret; + ret.reserve(shard_stores.local().mshard_stores.size()); + for (auto& mshard_store : shard_stores.local().mshard_stores) { + if (mshard_store->get_status() == true) { + ret.emplace_back(make_local_shared_foreign( + seastar::make_foreign(seastar::static_pointer_cast(mshard_store)))); + } + } + return ret; } seastar::future> @@ -230,8 +244,31 @@ public: seastar::future get_default_device_class() final; + seastar::future<> get_shard_nums(); + + private: - seastar::sharded shard_stores; +class MultiShardStores { + public: + std::vector> mshard_stores; + + public: + MultiShardStores(size_t count, + const std::string path, + unsigned int store_shard_nums) + : mshard_stores() { + mshard_stores.reserve(count); // Reserve space for the shards + for (size_t store_index = 0; store_index < count; ++store_index) { + mshard_stores.emplace_back(seastar::make_shared( + path, store_shard_nums, store_index)); + } + } + ~MultiShardStores() { + mshard_stores.clear(); + } + }; + seastar::sharded shard_stores; + unsigned int store_shard_nums = 0; const std::string path; uuid_d osd_fsid; }; -- 2.39.5