From b9f289187adef31d63fbeb2fb33909871c6b6c66 Mon Sep 17 00:00:00 2001 From: Chunmei Liu Date: Fri, 17 Oct 2025 23:15:40 +0000 Subject: [PATCH] crimson/os/seastore: create multiple device shards and store shards on each reactor. Signed-off-by: Chunmei Liu --- src/crimson/os/futurized_store.h | 17 +- src/crimson/os/seastore/device.h | 8 +- src/crimson/os/seastore/seastore.cc | 324 +++++++++++++----- src/crimson/os/seastore/seastore.h | 70 +++- src/crimson/os/seastore/segment_manager.h | 3 +- .../os/seastore/segment_manager/block.cc | 107 ++++-- .../os/seastore/segment_manager/block.h | 47 ++- 7 files changed, 438 insertions(+), 138 deletions(-) diff --git a/src/crimson/os/futurized_store.h b/src/crimson/os/futurized_store.h index 1ef8db7422e..b2fa51bea06 100644 --- a/src/crimson/os/futurized_store.h +++ b/src/crimson/os/futurized_store.h @@ -9,8 +9,11 @@ #include #include +#include +#include #include "os/Transaction.h" +#include "crimson/common/local_shared_foreign_ptr.h" #include "crimson/common/smp_helpers.h" #include "crimson/common/smp_helpers.h" #include "crimson/osd/exceptions.h" @@ -205,7 +208,7 @@ public: explicit FuturizedStore(const FuturizedStore& o) = delete; const FuturizedStore& operator=(const FuturizedStore& o) = delete; - virtual seastar::future<> start() = 0; + virtual seastar::future start() = 0; virtual seastar::future<> stop() = 0; @@ -227,13 +230,21 @@ public: virtual seastar::future<> write_meta(const std::string& key, const std::string& value) = 0; + + using StoreShardLRef = seastar::shared_ptr; + using StoreShardFRef = seastar::foreign_ptr; + using StoreShardRef = ::crimson::local_shared_foreign_ptr; + using StoreShardFFRef = seastar::foreign_ptr; + using StoreShardXcoreRef = ::crimson::local_shared_foreign_ptr; + // called on the shard and get this FuturizedStore::shard; - virtual Shard& get_sharded_store() = 0; + virtual StoreShardRef get_sharded_store(unsigned int store_index = 0) = 0; + virtual std::vector get_sharded_stores() = 0; virtual seastar::future> read_meta( const std::string& key) = 0; - using coll_core_t = std::pair; + using coll_core_t = std::pair>; virtual seastar::future> list_collections() = 0; virtual seastar::future get_default_device_class() = 0; diff --git a/src/crimson/os/seastore/device.h b/src/crimson/os/seastore/device.h index 5ef1b642171..19c63404bbd 100644 --- a/src/crimson/os/seastore/device.h +++ b/src/crimson/os/seastore/device.h @@ -90,15 +90,16 @@ class Device { public: virtual ~Device() {} - virtual seastar::future<> start() { + virtual seastar::future<> start(unsigned int shard_nums) { return seastar::now(); } virtual seastar::future<> stop() { return seastar::now(); } + // called on the shard to get this shard device; - virtual Device& get_sharded_device() { + virtual Device& get_sharded_device(unsigned int store_index = 0) { return *this; } @@ -166,6 +167,9 @@ public: return read_ertr::make_ready_future(std::move(*ptrref)); }); } + virtual read_ertr::future get_shard_nums() { + return read_ertr::make_ready_future(seastar::smp::count); + } }; using check_create_device_ertr = Device::access_ertr; diff --git a/src/crimson/os/seastore/seastore.cc b/src/crimson/os/seastore/seastore.cc index 89b0843438d..b8329b077c7 100644 --- a/src/crimson/os/seastore/seastore.cc +++ b/src/crimson/os/seastore/seastore.cc @@ -127,16 +127,23 @@ using crimson::common::get_conf; SeaStore::Shard::Shard( std::string root, Device* dev, - bool is_test) + bool is_test, + unsigned int store_shard_nums, + unsigned int store_index) :root(root), max_object_size( get_conf("seastore_default_max_object_size")), is_test(is_test), throttler( - get_conf("seastore_max_concurrent_transactions")) + get_conf("seastore_max_concurrent_transactions")), + store_index(store_index) { - device = &(dev->get_sharded_device()); - register_metrics(); + if(seastar::this_shard_id() + seastar::smp::count * store_index >= store_shard_nums) { + store_active = false; + } + device = &(dev->get_sharded_device(store_index)); + + register_metrics(store_index); } SeaStore::SeaStore( @@ -145,12 +152,16 @@ SeaStore::SeaStore( : root(root), mdstore(std::move(mdstore)) { + store_shard_nums = seastar::smp::count; } SeaStore::~SeaStore() = default; -void SeaStore::Shard::register_metrics() +void SeaStore::Shard::register_metrics(unsigned int store_index) { + if(!store_active) { + return; + } namespace sm = seastar::metrics; using op_type_t = crimson::os::seastore::op_type_t; std::pair labels_by_op_type[] = { @@ -174,7 +185,7 @@ void SeaStore::Shard::register_metrics() return get_latency(op_type); }, sm::description(desc), - {label} + {label, sm::label_instance("shard_store_index", std::to_string(store_index))} ), } ); @@ -188,21 +199,60 @@ void SeaStore::Shard::register_metrics() [this] { return throttler.get_current(); }, - sm::description("transactions that are running inside seastore") + sm::description("transactions that are running inside seastore"), + {sm::label_instance("shard_store_index", std::to_string(store_index))} ), sm::make_gauge( "pending_transactions", [this] { return throttler.get_pending(); }, - sm::description("transactions waiting to get " - "through seastore's throttler") + sm::description("transactions waiting to get " + "through seastore's throttler"), + {sm::label_instance("shard_store_index", std::to_string(store_index))} ) } ); } -seastar::future<> SeaStore::start() +seastar::future<> SeaStore::get_shard_nums() +{ + LOG_PREFIX(SeaStore::get_shard_nums); + auto tuple = co_await read_meta("mkfs_done"); + auto [done, value] = tuple; + if (done == -1) { + INFO("seastore not mkfs yet"); + store_shard_nums = seastar::smp::count; + co_return; + } else { + INFO("seastore mkfs done"); + auto shard_nums = co_await device->get_shard_nums( + ).handle_error( + crimson::ct_error::assert_all{ + "Invalid error in device->get_shard_nums" + }); + INFO("seastore shard nums {}", shard_nums); + store_shard_nums = shard_nums; + co_return; + } +} + +seastar::future<> SeaStore::shard_stores_start(bool is_test) +{ + LOG_PREFIX(SeaStore::shard_stores_start); + auto num_shard_services = (store_shard_nums + seastar::smp::count - 1 ) / seastar::smp::count; + INFO("store_shard_nums={} seastar::smp={}, num_shard_services={}", store_shard_nums, seastar::smp::count, num_shard_services); + return shard_stores.start(num_shard_services, root, device.get(), is_test, store_shard_nums); +} + +seastar::future<> SeaStore::shard_stores_stop() +{ + LOG_PREFIX(SeaStore::shard_stores_stop); + INFO("stopping shard stores"); + return shard_stores.stop(); +} + +seastar::future SeaStore::start() { LOG_PREFIX(SeaStore::start); INFO("..."); @@ -222,10 +272,12 @@ seastar::future<> SeaStore::start() ceph_assert(root != ""); DeviceRef device_obj = co_await Device::make_device(root, d_type); device = std::move(device_obj); - co_await device->start(); + co_await get_shard_nums(); + co_await device->start(store_shard_nums); ceph_assert(device); - co_await shard_stores.start(root, device.get(), is_test); + co_await shard_stores_start(is_test);; INFO("done"); + co_return store_shard_nums; } seastar::future<> SeaStore::test_start(DeviceRef device_obj) @@ -236,7 +288,7 @@ seastar::future<> SeaStore::test_start(DeviceRef device_obj) ceph_assert(device_obj); ceph_assert(root == ""); device = std::move(device_obj); - co_await shard_stores.start_single(root, device.get(), true); + co_await shard_stores.start_single(1, root, device.get(), true, seastar::smp::count); INFO("done"); } @@ -253,7 +305,7 @@ seastar::future<> SeaStore::stop() if (device) { co_await device->stop(); } - co_await shard_stores.stop(); + co_await shard_stores_stop(); INFO("done"); } @@ -263,7 +315,9 @@ SeaStore::mount_ertr::future<> SeaStore::test_mount() INFO("..."); ceph_assert(seastar::this_shard_id() == primary_core); - co_await shard_stores.local().mount_managers(); + co_await seastar::do_for_each(shard_stores.local().mshard_stores, [this](auto& mshard_store) { + return mshard_store->mount_managers(); + }); INFO("done"); } @@ -274,30 +328,35 @@ Device::access_ertr::future<> SeaStore::_mount() ceph_assert(seastar::this_shard_id() == primary_core); co_await device->mount(); - ceph_assert(device->get_sharded_device().get_block_size() >= laddr_t::UNIT_SIZE); + ceph_assert(device->get_sharded_device(0).get_block_size() >= laddr_t::UNIT_SIZE); - auto &sec_devices = device->get_sharded_device().get_secondary_devices(); + auto &sec_devices = device->get_sharded_device(0).get_secondary_devices(); for (auto& device_entry : sec_devices) { device_id_t id = device_entry.first; [[maybe_unused]] magic_t magic = device_entry.second.magic; device_type_t dtype = device_entry.second.dtype; std::string path = fmt::format("{}/block.{}.{}", root, dtype, std::to_string(id)); DeviceRef sec_dev = co_await Device::make_device(path, dtype); - co_await sec_dev->start(); + co_await sec_dev->start(store_shard_nums); co_await sec_dev->mount(); - ceph_assert(sec_dev->get_sharded_device().get_block_size() >= laddr_t::UNIT_SIZE); - assert(sec_dev->get_sharded_device().get_magic() == magic); + ceph_assert(sec_dev->get_sharded_device(0).get_block_size() >= laddr_t::UNIT_SIZE); + assert(sec_dev->get_sharded_device(0).get_magic() == magic); secondaries.emplace_back(std::move(sec_dev)); co_await set_secondaries(); } co_await shard_stores.invoke_on_all([](auto &local_store) { - return local_store.mount_managers(); + return seastar::do_for_each(local_store.mshard_stores, [](auto& mshard_store) { + return mshard_store->mount_managers(); + }); }); INFO("done"); } seastar::future<> SeaStore::Shard::mount_managers() { + if(!store_active) { + return seastar::now(); + } LOG_PREFIX(SeaStore::mount_managers); INFO("start"); init_managers(); @@ -315,15 +374,21 @@ seastar::future<> SeaStore::umount() ceph_assert(seastar::this_shard_id() == primary_core); co_await shard_stores.invoke_on_all([](auto &local_store) { - return local_store.umount().handle_error( - crimson::ct_error::assert_all{"Invalid error in SeaStoreS::umount"} - ); + return seastar::do_for_each(local_store.mshard_stores, [](auto& mshard_store) { + return mshard_store->umount().handle_error( + crimson::ct_error::assert_all{ + "Invalid error in shard_store->umount" + }); + }); }); INFO("done"); } base_ertr::future<> SeaStore::Shard::umount() { + if(!store_active) { + co_return; + } if (transaction_manager) { co_await transaction_manager->close(); } @@ -357,6 +422,9 @@ SeaStore::Shard::mkfs_managers() { LOG_PREFIX(SeaStoreS::mkfs_managers); INFO("..."); + if(!store_active) { + co_return; + } init_managers(); co_await transaction_manager->mkfs(); init_managers(); @@ -386,8 +454,12 @@ seastar::future<> SeaStore::set_secondaries() { auto sec_dev_ite = secondaries.rbegin(); Device* sec_dev = sec_dev_ite->get(); + return shard_stores.invoke_on_all([sec_dev](auto &local_store) { - local_store.set_secondaries(sec_dev->get_sharded_device()); + return seastar::do_for_each(local_store.mshard_stores, [sec_dev](auto& mshard_store) { + unsigned int index = mshard_store->get_store_index(); + mshard_store->set_secondaries(sec_dev->get_sharded_device(index)); + }); }); } @@ -403,7 +475,7 @@ SeaStore::mkfs_ertr::future<> SeaStore::test_mkfs(uuid_d new_osd_fsid) ERROR("failed"); co_return; } - co_await shard_stores.local().mkfs_managers().handle_error( + co_await shard_stores.local().mshard_stores[0]->mkfs_managers().handle_error( crimson::ct_error::assert_all{"Invalid error in SeaStore::mkfs"}); co_await prepare_meta(new_osd_fsid); INFO("done"); @@ -462,7 +534,7 @@ Device::access_ertr::future<> SeaStore::_mkfs(uuid_d new_osd_fsid) DeviceRef sec_dev = co_await Device::make_device(path, dtype); auto p_sec_dev = sec_dev.get(); secondaries.emplace_back(std::move(sec_dev)); - co_await p_sec_dev->start(); + co_await p_sec_dev->start(store_shard_nums); magic_t magic = (magic_t)std::rand(); sds.emplace((device_id_t)id, device_spec_t{magic, dtype, (device_id_t)id}); co_await p_sec_dev->mkfs( @@ -491,8 +563,10 @@ Device::access_ertr::future<> SeaStore::_mkfs(uuid_d new_osd_fsid) co_await device->mount(); DEBUG("mkfs managers"); co_await shard_stores.invoke_on_all([] (auto &local_store) { - return local_store.mkfs_managers().handle_error( - crimson::ct_error::assert_all{"Invalid error in SeaStoreS::mkfs_managers"}); + return seastar::do_for_each(local_store.mshard_stores, [](auto& mshard_store) { + return mshard_store->mkfs_managers().handle_error( + crimson::ct_error::assert_all{"Invalid error in SeaStoreS::mkfs_managers"}); + }); }); co_await prepare_meta(new_osd_fsid); co_await umount(); @@ -507,25 +581,42 @@ SeaStore::list_collections() DEBUG("..."); ceph_assert(seastar::this_shard_id() == primary_core); - return shard_stores.map([](auto &local_store) { - return local_store.list_collections(); - }).then([FNAME](std::vector> results) { - std::vector collections; - for (auto& colls : results) { - collections.insert(collections.end(), colls.begin(), colls.end()); + return shard_stores.map_reduce0( + [this](auto& local_store) { + // For each local store, collect all collections from its mshard_stores + return seastar::map_reduce( + local_store.mshard_stores.begin(), + local_store.mshard_stores.end(), + [](auto& mshard_store) { + return mshard_store->list_collections(); + }, + std::vector(), + [](auto&& merged, auto&& result) { + merged.insert(merged.end(), result.begin(), result.end()); + return std::move(merged); + } + ); + }, + std::vector(), + [](auto&& total, auto&& shard_result) { + total.insert(total.end(), shard_result.begin(), shard_result.end()); + return std::move(total); } - DEBUG("got {} collections", collections.size()); - return seastar::make_ready_future>( - std::move(collections)); + ).then([FNAME](auto all_collections) { + DEBUG("got {} collections", all_collections.size()); + return seastar::make_ready_future>(std::move(all_collections)); }); } -store_statfs_t SeaStore::Shard::stat() const +seastar::future SeaStore::Shard::stat() const { + if(!store_active) { + return seastar::make_ready_future(store_statfs_t()); + } LOG_PREFIX(SeaStoreS::stat); auto ss = transaction_manager->store_stat(); DEBUG("stat={}", ss); - return ss; + return seastar::make_ready_future(ss); } seastar::future SeaStore::stat() const @@ -535,17 +626,26 @@ seastar::future SeaStore::stat() const ceph_assert(seastar::this_shard_id() == primary_core); return shard_stores.map_reduce0( - [](const SeaStore::Shard &local_store) { - return local_store.stat(); + [](auto& local_store) { + return seastar::map_reduce( + local_store.mshard_stores.begin(), + local_store.mshard_stores.end(), + [](auto& mshard_store) { return mshard_store->stat(); }, + store_statfs_t(), + [](auto&& ss, auto&& ret) { + ss.add(ret); + return std::move(ss); + } + ); }, store_statfs_t(), - [](auto &&ss, auto &&ret) { - ss.add(ret); - return std::move(ss); + [](auto&& total_stats, auto&& shard_stats) { + total_stats.add(shard_stats); + return std::move(total_stats); } - ).then([FNAME](store_statfs_t ss) { - DEBUG("done, stat={}", ss); - return seastar::make_ready_future(std::move(ss)); + ).then([FNAME](auto final_stats) { + DEBUG("done, stat={}", final_stats); + return seastar::make_ready_future(std::move(final_stats)); }); } @@ -568,23 +668,25 @@ seastar::future<> SeaStore::report_stats() DEBUG("..."); ceph_assert(seastar::this_shard_id() == primary_core); - shard_device_stats.resize(seastar::smp::count); - shard_io_stats.resize(seastar::smp::count); - shard_cache_stats.resize(seastar::smp::count); - return shard_stores.invoke_on_all([this](const Shard &local_store) { - bool report_detail = false; - double seconds = 0; - if (seastar::this_shard_id() == 0) { - // avoid too verbose logs, only report detail in a particular shard - report_detail = true; - seconds = local_store.reset_report_interval(); - } - shard_device_stats[seastar::this_shard_id()] = - local_store.get_device_stats(report_detail, seconds); - shard_io_stats[seastar::this_shard_id()] = - local_store.get_io_stats(report_detail, seconds); - shard_cache_stats[seastar::this_shard_id()] = - local_store.get_cache_stats(report_detail, seconds); + shard_device_stats.resize(store_shard_nums); + shard_io_stats.resize(store_shard_nums); + shard_cache_stats.resize(store_shard_nums); + return shard_stores.invoke_on_all([this](auto& local_store) { + return seastar::do_for_each(local_store.mshard_stores, [this](auto& mshard_store) { + bool report_detail = false; + double seconds = 0; + if (seastar::this_shard_id() == 0 && mshard_store->get_store_index() == 0) { + // avoid too verbose logs, only report detail in a particular shard + report_detail = true; + seconds = mshard_store->reset_report_interval(); + } + shard_device_stats[seastar::this_shard_id() + seastar::smp::count * mshard_store->get_store_index()] = + mshard_store->get_device_stats(report_detail, seconds); + shard_io_stats[seastar::this_shard_id() + seastar::smp::count * mshard_store->get_store_index()] = + mshard_store->get_io_stats(report_detail, seconds); + shard_cache_stats[seastar::this_shard_id() + seastar::smp::count * mshard_store->get_store_index()] = + mshard_store->get_cache_stats(report_detail, seconds); + }); }).then([this, FNAME] { auto now = seastar::lowres_clock::now(); if (last_tp == seastar::lowres_clock::time_point::min()) { @@ -718,22 +820,19 @@ seastar::future<> SeaStore::report_stats() TransactionManager::read_extent_iertr::future> SeaStore::Shard::get_coll_bits(CollectionRef ch, Transaction &t) const { - return transaction_manager->read_collection_root(t) - .si_then([this, ch, &t](auto coll_root) { - return collection_manager->list(coll_root, t); - }).si_then([ch](auto colls) { - auto it = std::find_if(colls.begin(), colls.end(), - [ch](const std::pair& element) { - return element.first == ch->get_cid(); - }); - if (it != colls.end()) { - return TransactionManager::read_extent_iertr::make_ready_future< - std::optional>(it->second.split_bits); - } else { - return TransactionManager::read_extent_iertr::make_ready_future< - std::optional>(std::nullopt); - } - }); + auto coll_root = co_await transaction_manager->read_collection_root(t); + auto colls = co_await collection_manager->list(coll_root, t); + + auto it = std::find_if(colls.begin(), colls.end(), + [ch](const std::pair& element) { + return element.first == ch->get_cid(); + }); + + if (it != colls.end()) { + co_return it->second.split_bits; + } else { + co_return std::nullopt; + } } col_obj_ranges_t @@ -815,6 +914,7 @@ SeaStore::Shard::list_objects(CollectionRef ch, uint64_t limit, uint32_t op_flags) const { + assert(store_active); ++(shard_stats.read_num); ++(shard_stats.pending_read_num); @@ -919,6 +1019,7 @@ SeaStore::Shard::list_objects(CollectionRef ch, seastar::future SeaStore::Shard::create_new_collection(const coll_t& cid) { + assert(store_active); LOG_PREFIX(SeaStoreS::create_new_collection); DEBUG("cid={}", cid); return seastar::make_ready_future(_get_collection(cid)); @@ -927,13 +1028,14 @@ SeaStore::Shard::create_new_collection(const coll_t& cid) seastar::future SeaStore::Shard::open_collection(const coll_t& cid) { + assert(store_active); LOG_PREFIX(SeaStoreS::open_collection); DEBUG("cid={} ...", cid); return list_collections( ).then([cid, this, FNAME] (auto colls_cores) { if (auto found = std::find(colls_cores.begin(), colls_cores.end(), - std::make_pair(cid, seastar::this_shard_id())); + std::make_pair(cid, std::make_pair(seastar::this_shard_id(), store_index))); found != colls_cores.end()) { DEBUG("cid={} exists", cid); return seastar::make_ready_future(_get_collection(cid)); @@ -948,6 +1050,7 @@ seastar::future<> SeaStore::Shard::set_collection_opts(CollectionRef c, const pool_opts_t& opts) { + assert(store_active); LOG_PREFIX(SeaStoreS::set_collection_opts); DEBUG("cid={}, opts={} not implemented", c->get_cid(), opts); //TODO @@ -957,6 +1060,9 @@ SeaStore::Shard::set_collection_opts(CollectionRef c, seastar::future> SeaStore::Shard::list_collections() { + if(!store_active) { + return seastar::make_ready_future>(); + } ++(shard_stats.read_num); ++(shard_stats.pending_read_num); @@ -977,12 +1083,12 @@ SeaStore::Shard::list_collections() return transaction_manager->read_collection_root(t ).si_then([this, &t](auto coll_root) { return collection_manager->list(coll_root, t); - }).si_then([&ret](auto colls) { + }).si_then([this, &ret](auto colls) { ret.resize(colls.size()); std::transform( colls.begin(), colls.end(), ret.begin(), - [](auto p) { - return std::make_pair(p.first, seastar::this_shard_id()); + [this](auto p) { + return std::make_pair(p.first, std::make_pair(seastar::this_shard_id(), store_index)); }); }); }); @@ -1008,6 +1114,7 @@ SeaStore::Shard::_read( std::size_t len, uint32_t op_flags) { + assert(store_active); LOG_PREFIX(SeaStoreS::_read); size_t size = onode.get_layout().size; if (offset >= size) { @@ -1044,6 +1151,7 @@ SeaStore::Shard::read( size_t len, uint32_t op_flags) { + assert(store_active); ++(shard_stats.read_num); ++(shard_stats.pending_read_num); @@ -1068,6 +1176,7 @@ SeaStore::Shard::exists( const ghobject_t& oid, uint32_t op_flags) { + assert(store_active); LOG_PREFIX(SeaStoreS::exists); ++(shard_stats.read_num); ++(shard_stats.pending_read_num); @@ -1101,6 +1210,7 @@ SeaStore::Shard::readv( interval_set& m, uint32_t op_flags) { + assert(store_active); LOG_PREFIX(SeaStoreS::readv); DEBUG("cid={} oid={} op_flags=0x{:x} {} intervals", ch->get_cid(), _oid, op_flags, m.num_intervals()); @@ -1131,6 +1241,7 @@ SeaStore::Shard::_get_attr( Onode& onode, std::string_view name) const { + assert(store_active); LOG_PREFIX(SeaStoreS::_get_attr); auto& layout = onode.get_layout(); if (name == OI_ATTR && layout.oi_size) { @@ -1155,6 +1266,7 @@ SeaStore::Shard::get_attr( std::string_view name, uint32_t op_flags) const { + assert(store_active); ++(shard_stats.read_num); ++(shard_stats.pending_read_num); @@ -1182,6 +1294,7 @@ SeaStore::Shard::_get_attrs( Transaction& t, Onode& onode) { + assert(store_active); auto& layout = onode.get_layout(); return omaptree_get_values( t, get_omap_root(omap_type_t::XATTR, onode), std::nullopt @@ -1210,6 +1323,7 @@ SeaStore::Shard::get_attrs( const ghobject_t& oid, uint32_t op_flags) { + assert(store_active); ++(shard_stats.read_num); ++(shard_stats.pending_read_num); @@ -1254,6 +1368,9 @@ seastar::future SeaStore::Shard::stat( const ghobject_t& oid, uint32_t op_flags) { + if(!store_active) { + return seastar::make_ready_future(); + } ++(shard_stats.read_num); ++(shard_stats.pending_read_num); @@ -1282,11 +1399,13 @@ SeaStore::Shard::omap_get_header( const ghobject_t& oid, uint32_t op_flags) { + assert(store_active); return get_attr(ch, oid, OMAP_HEADER_XATTR_KEY, op_flags); } omap_root_t SeaStore::Shard::select_log_omap_root(Onode& onode) const { + assert(store_active); auto log_root = get_omap_root(omap_type_t::LOG, onode); if (log_root.is_null()) { return get_omap_root(omap_type_t::OMAP, onode); @@ -1303,6 +1422,7 @@ SeaStore::Shard::omap_get_values( const omap_keys_t &keys, uint32_t op_flags) { + assert(store_active); ++(shard_stats.read_num); ++(shard_stats.pending_read_num); @@ -1332,6 +1452,7 @@ SeaStore::Shard::omap_iterate( omap_iterate_cb_t callback, uint32_t op_flags) { + assert(store_active); ++(shard_stats.read_num); ++(shard_stats.pending_read_num); return seastar::do_with( @@ -1401,6 +1522,7 @@ SeaStore::Shard::fiemap( uint64_t len, uint32_t op_flags) { + assert(store_active); ++(shard_stats.read_num); ++(shard_stats.pending_read_num); @@ -1436,6 +1558,7 @@ seastar::future<> SeaStore::Shard::do_transaction_no_callbacks( CollectionRef _ch, ceph::os::Transaction&& _t) { + assert(store_active); LOG_PREFIX(SeaStoreS::do_transaction_no_callbacks); ++(shard_stats.io_num); ++(shard_stats.pending_io_num); @@ -1523,6 +1646,7 @@ seastar::future<> SeaStore::Shard::do_transaction_no_callbacks( seastar::future<> SeaStore::Shard::flush(CollectionRef ch) { + assert(store_active); ++(shard_stats.flush_num); ++(shard_stats.pending_flush_num); @@ -2375,7 +2499,7 @@ seastar::future<> SeaStore::write_meta( ceph_assert(seastar::this_shard_id() == primary_core); return seastar::do_with(key, value, [this, FNAME](auto& key, auto& value) { - return shard_stores.local().write_meta(key, value + return shard_stores.local().mshard_stores[0]->write_meta(key, value ).then([this, &key, &value] { return mdstore->write_meta(key, value); }).safe_then([FNAME, &key, &value] { @@ -2390,6 +2514,7 @@ seastar::future<> SeaStore::Shard::write_meta( const std::string& key, const std::string& value) { + assert(store_active); ++(shard_stats.io_num); ++(shard_stats.pending_io_num); // For TM::submit_transaction() @@ -2460,6 +2585,7 @@ uuid_d SeaStore::Shard::get_fsid() const void SeaStore::Shard::init_managers() { + assert(store_active); LOG_PREFIX(SeaStore::init_managers); DEBUG("start"); transaction_manager.reset(); @@ -2468,7 +2594,7 @@ void SeaStore::Shard::init_managers() shard_stats = {}; transaction_manager = make_transaction_manager( - device, secondaries, shard_stats, is_test); + device, secondaries, shard_stats, store_index, is_test); collection_manager = std::make_unique( *transaction_manager); onode_manager = std::make_unique( @@ -2477,6 +2603,9 @@ void SeaStore::Shard::init_managers() double SeaStore::Shard::reset_report_interval() const { + if(!store_active) { + return 0; + } double seconds; auto now = seastar::lowres_clock::now(); if (last_tp == seastar::lowres_clock::time_point::min()) { @@ -2492,12 +2621,18 @@ double SeaStore::Shard::reset_report_interval() const device_stats_t SeaStore::Shard::get_device_stats( bool report_detail, double seconds) const { + if (!store_active) { + return device_stats_t(); + } return transaction_manager->get_device_stats(report_detail, seconds); } shard_stats_t SeaStore::Shard::get_io_stats( bool report_detail, double seconds) const { + if (!store_active) { + return shard_stats_t(); + } shard_stats_t ret = shard_stats; ret.minus(last_shard_stats); @@ -2545,6 +2680,9 @@ shard_stats_t SeaStore::Shard::get_io_stats( cache_stats_t SeaStore::Shard::get_cache_stats( bool report_detail, double seconds) const { + if (!store_active) { + return cache_stats_t(); + } return transaction_manager->get_cache_stats( report_detail, seconds); } @@ -2576,6 +2714,7 @@ SeaStore::Shard::omaptree_get_value( omap_root_t&& root, std::string_view key) const { + assert(store_active); return seastar::do_with( BtreeOMapManager(*transaction_manager), std::move(root), @@ -2607,6 +2746,7 @@ SeaStore::Shard::omaptree_get_values( omap_root_t&& root, const omap_keys_t& keys) const { + assert(store_active); LOG_PREFIX(SeaStoreS::omaptree_get_values); auto type = root.get_type(); if (root.is_null()) { @@ -2676,6 +2816,7 @@ SeaStore::Shard::omaptree_list( const std::optional& start, OMapManager::omap_list_config_t config) const { + assert(store_active); if (root.is_null()) { return seastar::make_ready_future( true, omap_values_t{} @@ -2698,6 +2839,7 @@ SeaStore::Shard::omaptree_get_values( omap_root_t&& root, const std::optional& start) const { + assert(store_active); LOG_PREFIX(SeaStoreS::omaptree_get_values); auto type = root.get_type(); DEBUGT("{} start={} ...", t, type, start.has_value() ? *start : ""); @@ -2717,6 +2859,7 @@ SeaStore::Shard::omaptree_do_clear( Transaction& t, omap_root_t&& root) { + assert(store_active); assert(!root.is_null()); return seastar::do_with( BtreeOMapManager(*transaction_manager), @@ -2736,6 +2879,7 @@ SeaStore::Shard::omaptree_clear_no_onode( Transaction& t, omap_root_t&& root) { + assert(store_active); LOG_PREFIX(SeaStoreS::omaptree_clear_no_onode); if (root.is_null()) { DEBUGT("{}, null root", t, root.get_type()); @@ -2771,6 +2915,7 @@ SeaStore::Shard::omaptree_clear( omap_root_t&& root, Onode& onode) { + assert(store_active); LOG_PREFIX(SeaStoreS::omaptree_clear); if (root.is_null()) { DEBUGT("{}, null root", t, root.get_type()); @@ -2793,6 +2938,7 @@ SeaStore::Shard::omaptree_clone( Onode& onode, Onode& d_onode) { + assert(store_active); LOG_PREFIX(SeaStoreS::omaptree_clone); DEBUGT("{} start, list ...", t, type); return trans_intr::repeat([&t, &onode, &d_onode, this, type, FNAME] { @@ -2849,6 +2995,7 @@ SeaStore::Shard::omaptree_set_keys( Onode& onode, std::map&& kvs) { + assert(store_active); return seastar::do_with( BtreeOMapManager(*transaction_manager), std::move(root), @@ -2884,6 +3031,7 @@ SeaStore::Shard::omaptree_rm_keys( Onode& onode, omap_keys_t&& keys) { + assert(store_active); LOG_PREFIX(SeaStoreS::omaptree_rm_keys); auto type = root.get_type(); if (root.is_null()) { @@ -2921,6 +3069,7 @@ SeaStore::Shard::omaptree_rm_keyrange( std::string first, std::string last) { + assert(store_active); LOG_PREFIX(SeaStoreS::omaptree_rm_keyrange); auto type = root.get_type(); if (first > last) { @@ -2961,6 +3110,7 @@ SeaStore::Shard::omaptree_rm_key( Onode& onode, std::string&& name) { + assert(store_active); LOG_PREFIX(SeaStoreS::omaptree_rm_key); if (root.is_null()) { DEBUGT("{} key={}, null root", t, root.get_type(), name); diff --git a/src/crimson/os/seastore/seastore.h b/src/crimson/os/seastore/seastore.h index 1cd88cc95be..16a271049e0 100644 --- a/src/crimson/os/seastore/seastore.h +++ b/src/crimson/os/seastore/seastore.h @@ -18,6 +18,7 @@ #include "os/Transaction.h" #include "crimson/common/throttle.h" +#include "crimson/common/smp_helpers.h" #include "crimson/os/futurized_collection.h" #include "crimson/os/futurized_store.h" @@ -93,7 +94,9 @@ public: Shard( std::string root, Device* device, - bool is_test); + bool is_test, + unsigned int store_shard_nums, + unsigned int store_index = 0); ~Shard() = default; seastar::future stat( @@ -200,7 +203,7 @@ public: seastar::future get_default_device_class(); - store_statfs_t stat() const; + seastar::future stat() const; uuid_d get_fsid() const; @@ -216,6 +219,13 @@ public: cache_stats_t get_cache_stats(bool report_detail, double seconds) const; + unsigned int get_store_index() const { + return store_index; + } + + bool get_status() const { + return store_active; + } private: struct internal_context_t { CollectionRef ch; @@ -522,9 +532,11 @@ public: OnodeManagerRef onode_manager; common::Throttle throttler; + unsigned int store_index; + bool store_active = true; seastar::metrics::metric_group metrics; - void register_metrics(); + void register_metrics(unsigned int store_index); mutable shard_stats_t shard_stats; mutable seastar::lowres_clock::time_point last_tp = @@ -538,7 +550,7 @@ public: MDStoreRef mdstore); ~SeaStore(); - seastar::future<> start() final; + seastar::future start() final; seastar::future<> stop() final; Device::access_ertr::future<> _mount(); @@ -569,7 +581,7 @@ public: uuid_d get_fsid() const final { ceph_assert(seastar::this_shard_id() == primary_core); - return shard_stores.local().get_fsid(); + return shard_stores.local().mshard_stores[0]->get_fsid(); } seastar::future<> write_meta(const std::string& key, const std::string& value) final; @@ -580,8 +592,24 @@ public: seastar::future get_default_device_class() final; - FuturizedStore::Shard& get_sharded_store() final { - return shard_stores.local(); + FuturizedStore::StoreShardRef get_sharded_store(unsigned int store_index = 0) final { + assert(!shard_stores.local().mshard_stores.empty()); + assert(store_index < shard_stores.local().mshard_stores.size()); + assert(shard_stores.local().mshard_stores[store_index]->get_status() == true); + return make_local_shared_foreign( + seastar::make_foreign(seastar::static_pointer_cast( + shard_stores.local().mshard_stores[store_index]))); + } + std::vector get_sharded_stores() final { + std::vector ret; + ret.reserve(shard_stores.local().mshard_stores.size()); + for (auto& mshard_store : shard_stores.local().mshard_stores) { + if (mshard_store->get_status() == true) { + ret.emplace_back(make_local_shared_foreign( + seastar::make_foreign(seastar::static_pointer_cast(mshard_store)))); + } + } + return ret; } static col_obj_ranges_t @@ -605,12 +633,38 @@ private: seastar::future<> set_secondaries(); + seastar::future<> get_shard_nums(); + seastar::future<> shard_stores_start(bool is_test); + seastar::future<> shard_stores_stop(); + private: +class MultiShardStores { + public: + std::vector> mshard_stores; + + public: + MultiShardStores(size_t count, + const std::string& root, + Device* dev, + bool is_test, + unsigned int store_shard_nums) + : mshard_stores() { + mshard_stores.reserve(count); // Reserve space for the shards + for (size_t store_index = 0; store_index < count; ++store_index) { + mshard_stores.emplace_back(seastar::make_shared( + root, dev, is_test, store_shard_nums, store_index)); + } + } + ~MultiShardStores() { + mshard_stores.clear(); + } + }; std::string root; MDStoreRef mdstore; DeviceRef device; std::vector secondaries; - seastar::sharded shard_stores; + seastar::sharded shard_stores; + unsigned int store_shard_nums = 0; mutable seastar::lowres_clock::time_point last_tp = seastar::lowres_clock::time_point::min(); diff --git a/src/crimson/os/seastore/segment_manager.h b/src/crimson/os/seastore/segment_manager.h index 1e037d320cb..ba28c836777 100644 --- a/src/crimson/os/seastore/segment_manager.h +++ b/src/crimson/os/seastore/segment_manager.h @@ -57,12 +57,11 @@ struct block_sm_superblock_t { } void validate() const { - ceph_assert(shard_num == seastar::smp::count); ceph_assert(block_size > 0); ceph_assert(segment_size > 0 && segment_size % block_size == 0); ceph_assert_always(segment_size <= SEGMENT_OFF_MAX); - for (unsigned int i = 0; i < seastar::smp::count; i ++) { + for (unsigned int i = 0; i < shard_num; i ++) { ceph_assert(shard_infos[i].size > segment_size && shard_infos[i].size % block_size == 0); ceph_assert_always(shard_infos[i].size <= DEVICE_OFF_MAX); diff --git a/src/crimson/os/seastore/segment_manager/block.cc b/src/crimson/os/seastore/segment_manager/block.cc index b68b8cc8609..f1bd3cace59 100644 --- a/src/crimson/os/seastore/segment_manager/block.cc +++ b/src/crimson/os/seastore/segment_manager/block.cc @@ -4,6 +4,8 @@ #include #include +#include + #include #include @@ -442,13 +444,53 @@ BlockSegmentManager::~BlockSegmentManager() { } +seastar::future<> BlockSegmentManager::start(unsigned int shard_nums) +{ + LOG_PREFIX(BlockSegmentManager::start); + device_shard_nums = shard_nums; + auto num_shard_services = (device_shard_nums + seastar::smp::count - 1 ) / seastar::smp::count; + INFO("device_shard_nums={} seastar::smp={}, num_shard_services={}", device_shard_nums, seastar::smp::count, num_shard_services); + return shard_devices.start(num_shard_services, device_path, superblock.config.spec.dtype); + +} + +seastar::future<> BlockSegmentManager::stop() +{ + return shard_devices.stop(); +} + +Device& BlockSegmentManager::get_sharded_device(unsigned int store_index) +{ + assert(store_index < shard_devices.local().mshard_devices.size()); + return *shard_devices.local().mshard_devices[store_index]; +} + +SegmentManager::read_ertr::future BlockSegmentManager::get_shard_nums() +{ + return open_device( + device_path + ).safe_then([this](auto p) { + device = std::move(p.first); + auto sd = p.second; + return read_superblock(device, sd); + }).safe_then([](auto sb) { + return read_ertr::make_ready_future(sb.shard_num); + }).handle_error( + crimson::ct_error::assert_all{ + "Invalid error in BlockSegmentManager::get_shard_nums" + } + ); +} + BlockSegmentManager::mount_ret BlockSegmentManager::mount() { return shard_devices.invoke_on_all([](auto &local_device) { - return local_device.shard_mount( - ).handle_error( - crimson::ct_error::assert_all{ - "Invalid error in BlockSegmentManager::mount" + return seastar::do_for_each(local_device.mshard_devices, [](auto& mshard_device) { + return mshard_device->shard_mount( + ).handle_error( + crimson::ct_error::assert_all{ + "Invalid error in BlockSegmentManager::mount" + }); }); }); } @@ -462,9 +504,17 @@ BlockSegmentManager::mount_ret BlockSegmentManager::shard_mount() device = std::move(p.first); auto sd = p.second; return read_superblock(device, sd); - }).safe_then([=, this](auto sb) { + }).safe_then([=, this](auto sb) ->mount_ertr::future<> { set_device_id(sb.config.spec.id); - shard_info = sb.shard_infos[seastar::this_shard_id()]; + if(seastar::this_shard_id() + seastar::smp::count * store_index >= sb.shard_num) { + INFO("{} shard_id {} out of range {}", + device_id_printer_t{get_device_id()}, + seastar::this_shard_id() + seastar::smp::count * store_index, + sb.shard_num); + store_active = false; + return mount_ertr::now(); + } + shard_info = sb.shard_infos[seastar::this_shard_id() + seastar::smp::count * store_index]; INFO("{} read {}", device_id_printer_t{get_device_id()}, shard_info); sb.validate(); superblock = sb; @@ -491,20 +541,22 @@ BlockSegmentManager::mount_ret BlockSegmentManager::shard_mount() }); }).safe_then([this, FNAME] { INFO("{} complete", device_id_printer_t{get_device_id()}); - register_metrics(); + register_metrics(store_index); }); } BlockSegmentManager::mkfs_ret BlockSegmentManager::mkfs( device_config_t sm_config) { - return shard_devices.local().primary_mkfs(sm_config + return shard_devices.local().mshard_devices[0]->primary_mkfs(sm_config ).safe_then([this] { return shard_devices.invoke_on_all([](auto &local_device) { - return local_device.shard_mkfs( - ).handle_error( - crimson::ct_error::assert_all{ - "Invalid error in BlockSegmentManager::mkfs" + return seastar::do_for_each(local_device.mshard_devices, [](auto& mshard_device) { + return mshard_device->shard_mkfs( + ).handle_error( + crimson::ct_error::assert_all{ + "Invalid error in BlockSegmentManager::mkfs" + }); }); }); }); @@ -687,14 +739,23 @@ SegmentManager::read_ertr::future<> BlockSegmentManager::read( out); } -void BlockSegmentManager::register_metrics() +void BlockSegmentManager::register_metrics(unsigned int store_index) { LOG_PREFIX(BlockSegmentManager::register_metrics); + if (!store_active) { + INFO("{} shard {} is not active, skip registering metrics", + device_id_printer_t{get_device_id()}, store_index); + return; + } + DEBUG("{}", device_id_printer_t{get_device_id()}); namespace sm = seastar::metrics; std::vector label_instances; label_instances.push_back(sm::label_instance("device_id", get_device_id())); + label_instances.push_back( + sm::label_instance("shard_device_index", std::to_string(store_index))); stats.reset(); + metrics.add_group( "segment_manager", { @@ -702,61 +763,61 @@ void BlockSegmentManager::register_metrics() "data_read_num", stats.data_read.num, sm::description("total number of data read"), - label_instances + label_instances ), sm::make_counter( "data_read_bytes", stats.data_read.bytes, sm::description("total bytes of data read"), - label_instances + label_instances ), sm::make_counter( "data_write_num", stats.data_write.num, sm::description("total number of data write"), - label_instances + label_instances ), sm::make_counter( "data_write_bytes", stats.data_write.bytes, sm::description("total bytes of data write"), - label_instances + label_instances ), sm::make_counter( "metadata_write_num", stats.metadata_write.num, sm::description("total number of metadata write"), - label_instances + label_instances ), sm::make_counter( "metadata_write_bytes", stats.metadata_write.bytes, sm::description("total bytes of metadata write"), - label_instances + label_instances ), sm::make_counter( "opened_segments", stats.opened_segments, sm::description("total segments opened"), - label_instances + label_instances ), sm::make_counter( "closed_segments", stats.closed_segments, sm::description("total segments closed"), - label_instances + label_instances ), sm::make_counter( "closed_segments_unused_bytes", stats.closed_segments_unused_bytes, sm::description("total unused bytes of closed segments"), - label_instances + label_instances ), sm::make_counter( "released_segments", stats.released_segments, sm::description("total segments released"), - label_instances + label_instances ), } ); diff --git a/src/crimson/os/seastore/segment_manager/block.h b/src/crimson/os/seastore/segment_manager/block.h index a0445371016..05a253e3f64 100644 --- a/src/crimson/os/seastore/segment_manager/block.h +++ b/src/crimson/os/seastore/segment_manager/block.h @@ -112,17 +112,12 @@ public: class BlockSegmentManager final : public SegmentManager { // interfaces used by Device public: - seastar::future<> start() { - return shard_devices.start(device_path, superblock.config.spec.dtype); - } + seastar::future<> start(unsigned int shard_nums) final; - seastar::future<> stop() { - return shard_devices.stop(); - } + seastar::future<> stop() final; + + Device& get_sharded_device(unsigned int store_index = 0) final; - Device& get_sharded_device() final { - return shard_devices.local(); - } mount_ret mount() final; mkfs_ret mkfs(device_config_t) final; @@ -132,8 +127,10 @@ public: BlockSegmentManager( const std::string &path, - device_type_t dtype) - : device_path(path) { + device_type_t dtype, + unsigned int store_index = 0) + : device_path(path), + store_index(store_index) { ceph_assert(get_device_type() == device_type_t::NONE); superblock.config.spec.dtype = dtype; } @@ -149,6 +146,8 @@ public: size_t len, ceph::bufferptr &out) final; + read_ertr::future get_shard_nums() final; + device_type_t get_device_type() const final { return superblock.config.spec.dtype; } @@ -213,7 +212,7 @@ private: } } stats; - void register_metrics(); + void register_metrics(unsigned int store_index); seastar::metrics::metric_group metrics; std::string device_path; @@ -256,7 +255,29 @@ private: // all shards mount mount_ret shard_mount(); - seastar::sharded shard_devices; + unsigned int device_shard_nums = 0; + unsigned int store_index = 0; + bool store_active = true; + class MultiShardDevices { + public: + std::vector> mshard_devices; + + public: + MultiShardDevices(size_t count, + const std::string path, + device_type_t dtype) + : mshard_devices() { + mshard_devices.reserve(count); + for (size_t store_index = 0; store_index < count; ++store_index) { + mshard_devices.emplace_back(std::make_unique( + path, dtype, store_index)); + } + } + ~MultiShardDevices() { + mshard_devices.clear(); + } + }; + seastar::sharded shard_devices; }; } -- 2.39.5