From 6767df13b19951287d744f0ea8b1bb63d6290260 Mon Sep 17 00:00:00 2001 From: Chunmei Liu Date: Wed, 20 Aug 2025 23:56:34 +0000 Subject: [PATCH] crimson/osd/shard_services: get multiple store shards for per local state, and use store index to create pg mapping Signed-off-by: Chunmei Liu --- src/crimson/osd/osd.cc | 37 ++++++---- .../osd/osd_operations/pg_advance_map.cc | 4 +- src/crimson/osd/pg_shard_manager.cc | 13 ++-- src/crimson/osd/pg_shard_manager.h | 19 +++--- src/crimson/osd/shard_services.cc | 68 +++++++++++-------- src/crimson/osd/shard_services.h | 47 ++++++++++--- 6 files changed, 121 insertions(+), 67 deletions(-) diff --git a/src/crimson/osd/osd.cc b/src/crimson/osd/osd.cc index c66779f6f1e..19aa4d03c41 100644 --- a/src/crimson/osd/osd.cc +++ b/src/crimson/osd/osd.cc @@ -183,7 +183,7 @@ CompatSet get_osd_initial_compat_set() seastar::future<> OSD::open_meta_coll() { ceph_assert(seastar::this_shard_id() == PRIMARY_CORE); - return store.get_sharded_store().open_collection( + return store.get_sharded_store()->open_collection( coll_t::meta() ).then([this](auto ch) { pg_shard_manager.init_meta_coll(ch, store.get_sharded_store()); @@ -257,9 +257,9 @@ seastar::future OSD::get_perf_reports() { seastar::future OSD::open_or_create_meta_coll(FuturizedStore &store) { - return store.get_sharded_store().open_collection(coll_t::meta()).then([&store](auto ch) { + return store.get_sharded_store()->open_collection(coll_t::meta()).then([&store](auto ch) { if (!ch) { - return store.get_sharded_store().create_new_collection( + return store.get_sharded_store()->create_new_collection( coll_t::meta() ).then([&store](auto ch) { return OSDMeta(ch, store.get_sharded_store()); @@ -360,7 +360,7 @@ seastar::future<> OSD::_write_superblock( meta_coll.create(t); meta_coll.store_superblock(t, superblock); DEBUG("OSD::_write_superblock: do_transaction..."); - return store.get_sharded_store().do_transaction( + return store.get_sharded_store()->do_transaction( meta_coll.collection(), std::move(t)); }), @@ -451,25 +451,36 @@ seastar::future<> OSD::start() } startup_time = ceph::mono_clock::now(); ceph_assert(seastar::this_shard_id() == PRIMARY_CORE); - return store.start().then([this] { - return pg_to_shard_mappings.start(0, seastar::smp::count + return store.start().then([this] (auto store_shard_nums) { + return pg_to_shard_mappings.start(0, seastar::smp::count, store_shard_nums ).then([this] { return osd_singleton_state.start_single( whoami, std::ref(*cluster_msgr), std::ref(*public_msgr), std::ref(*monc), std::ref(*mgrc)); }).then([this] { return osd_states.start(); - }).then([this] { + }).then([this, store_shard_nums] { ceph::mono_time startup_time = ceph::mono_clock::now(); return shard_services.start( std::ref(osd_singleton_state), std::ref(pg_to_shard_mappings), + store_shard_nums, whoami, startup_time, osd_singleton_state.local().perf, osd_singleton_state.local().recoverystate_perf, std::ref(store), std::ref(osd_states)); + }).then([this, FNAME] { + return shard_services.invoke_on_all( + [this](auto& local_service) { + local_service.set_container(shard_services); + }); + }).then([this, FNAME] { + return shard_services.invoke_on_all( + [this](auto& local_service) { + return local_service.get_remote_store(); + }); }); }).then([this, FNAME] { heartbeat.reset(new Heartbeat{ @@ -1213,7 +1224,7 @@ seastar::future<> OSD::_handle_osd_map(Ref m) co_await pg_shard_manager.set_superblock(superblock); DEBUG("submitting transaction"); - co_await store.get_sharded_store().do_transaction( + co_await store.get_sharded_store()->do_transaction( pg_shard_manager.get_meta_coll().collection(), std::move(t)); // TODO: write to superblock and commit the transaction @@ -1586,7 +1597,7 @@ seastar::future OSD::run_bench(int64_t count, int64_t bsize, int64_t osi std::vector> futures; std::vector> cleanup_futures; - auto collection_future = store.get_sharded_store().open_collection( + auto collection_future = store.get_sharded_store()->open_collection( coll_t::meta()); auto collection_ref = co_await std::move(collection_future); ceph::os::Transaction cleanup_t; @@ -1603,10 +1614,10 @@ seastar::future OSD::run_bench(int64_t count, int64_t bsize, int64_t osi ghobject_t::NO_GEN, shard_id_t::NO_SHARD); t.write(coll_t::meta(), oid, 0, data.size(), bl); - futures.push_back(store.get_sharded_store().do_transaction( + futures.push_back(store.get_sharded_store()->do_transaction( collection_ref, std::move(t))); cleanup_t.remove(coll_t::meta(), oid); - cleanup_futures.push_back(store.get_sharded_store().do_transaction( + cleanup_futures.push_back(store.get_sharded_store()->do_transaction( collection_ref, std::move(cleanup_t))); } } @@ -1640,12 +1651,12 @@ seastar::future OSD::run_bench(int64_t count, int64_t bsize, int64_t osi t.write(coll_t::meta(), oid, offset, bsize, bl); - futures_bench.push_back(store.get_sharded_store().do_transaction( + futures_bench.push_back(store.get_sharded_store()->do_transaction( collection_ref, std::move(t))); if (!onum || !osize) { cleanup_t.remove(coll_t::meta(), oid); - cleanup_futures.push_back(store.get_sharded_store().do_transaction( + cleanup_futures.push_back(store.get_sharded_store()->do_transaction( collection_ref, std::move(cleanup_t))); } } diff --git a/src/crimson/osd/osd_operations/pg_advance_map.cc b/src/crimson/osd/osd_operations/pg_advance_map.cc index e136e168ca8..f8edcd21e9e 100644 --- a/src/crimson/osd/osd_operations/pg_advance_map.cc +++ b/src/crimson/osd/osd_operations/pg_advance_map.cc @@ -168,11 +168,11 @@ seastar::future<> PGAdvanceMap::split_pg( children_pgids.insert(child_pgid); // Map each child pg ID to a core - auto core = co_await shard_services.create_split_pg_mapping(child_pgid, seastar::this_shard_id()); + auto core = co_await shard_services.create_split_pg_mapping(child_pgid, seastar::this_shard_id(), 0); DEBUG(" PG {} mapped to {}", child_pgid.pgid, core); DEBUG(" {} map epoch: {}", child_pgid.pgid, pg_epoch); auto map = next_map; - auto child_pg = co_await shard_services.make_pg(std::move(map), child_pgid, true); + auto child_pg = co_await shard_services.make_pg(std::move(map), child_pgid, 0, true); DEBUG(" Parent pgid: {}", pg->get_pgid()); DEBUG(" Child pgid: {}", child_pg->get_pgid()); diff --git a/src/crimson/osd/pg_shard_manager.cc b/src/crimson/osd/pg_shard_manager.cc index 3ae43d4d7ec..e9563e63b68 100644 --- a/src/crimson/osd/pg_shard_manager.cc +++ b/src/crimson/osd/pg_shard_manager.cc @@ -20,19 +20,20 @@ seastar::future<> PGShardManager::load_pgs(crimson::os::FuturizedStore& store) return seastar::parallel_for_each( colls_cores, [this](auto coll_core) { - auto[coll, shard_core] = coll_core; + auto[coll, shard_core_index] = coll_core; + auto[shard_core, store_index] = shard_core_index; spg_t pgid; if (coll.is_pg(&pgid)) { return get_pg_to_shard_mapping().get_or_create_pg_mapping( - pgid, shard_core - ).then([this, pgid] (auto core) { + pgid, shard_core, store_index + ).then([this, pgid] (auto core_store) { return this->with_remote_shard_state( - core, - [pgid]( + core_store.first, + [pgid, core_store]( PerShardState &per_shard_state, ShardServices &shard_services) { return shard_services.load_pg( - pgid + pgid, core_store.second ).then([pgid, &per_shard_state](auto &&pg) { logger().info("load_pgs: loaded {}", pgid); return pg->clear_temp_objects( diff --git a/src/crimson/osd/pg_shard_manager.h b/src/crimson/osd/pg_shard_manager.h index f0f4cbc9b25..f202f2cd332 100644 --- a/src/crimson/osd/pg_shard_manager.h +++ b/src/crimson/osd/pg_shard_manager.h @@ -223,17 +223,19 @@ public: template seastar::future<> run_with_pg_maybe_create( typename T::IRef op, - ShardServices &target_shard_services + ShardServices &target_shard_services, + unsigned int store_index ) { static_assert(T::can_create()); auto &logger = crimson::get_logger(ceph_subsys_osd); auto &opref = *op; return opref.template with_blocking_event< PGMap::PGCreationBlockingEvent - >([&target_shard_services, &opref](auto &&trigger) { + >([&target_shard_services, &opref, store_index](auto &&trigger) { return target_shard_services.get_or_create_pg( std::move(trigger), opref.get_pgid(), + store_index, opref.get_create_info() ); }).safe_then([&logger, &target_shard_services, &opref](Ref pgref) { @@ -407,22 +409,22 @@ public: return seastar::make_exception_future<>(fut.get_exception()); } - auto core = fut.get(); + auto core_store = fut.get(); logger.debug("{}: can_create={}, target-core={}", - *op, T::can_create(), core); + *op, T::can_create(), core_store.first); return this->template with_remote_shard_state_and_op( - core, std::move(op), - [this](ShardServices &target_shard_services, + core_store.first, std::move(op), + [this, core_store](ShardServices &target_shard_services, typename T::IRef op) { auto &opref = *op; auto &logger = crimson::get_logger(ceph_subsys_osd); logger.debug("{}: entering create_or_wait_pg", opref); return opref.template enter_stage<>( opref.get_pershard_pipeline(target_shard_services).create_or_wait_pg - ).then([this, &target_shard_services, op=std::move(op)]() mutable { + ).then([this, &target_shard_services, op=std::move(op), core_store]() mutable { if constexpr (T::can_create()) { return this->template run_with_pg_maybe_create( - std::move(op), target_shard_services); + std::move(op), target_shard_services, core_store.second); } else { return this->template run_with_pg_maybe_wait( std::move(op), target_shard_services); @@ -469,6 +471,7 @@ public: opref, opref.get_pgid()); return seastar::now(); } + SUBDEBUG(osd, "{}: have_pg", opref); return op->with_pg( target_shard_services, pg ).finally([op] {}); diff --git a/src/crimson/osd/shard_services.cc b/src/crimson/osd/shard_services.cc index 09da0412416..db1d0d19ce3 100644 --- a/src/crimson/osd/shard_services.cc +++ b/src/crimson/osd/shard_services.cc @@ -40,7 +40,7 @@ PerShardState::PerShardState( crimson::os::FuturizedStore &store, OSDState &osd_state) : whoami(whoami), - store(store.get_sharded_store()), + stores(store.get_sharded_stores()), osd_state(osd_state), osdmap_gate("PerShardState::osdmap_gate"), perf(perf), recoverystate_perf(recoverystate_perf), @@ -560,6 +560,7 @@ void OSDSingletonState::trim_maps(ceph::os::Transaction& t, seastar::future> ShardServices::make_pg( OSDMapService::cached_map_t create_map, spg_t pgid, + unsigned store_index, bool do_create) { using ec_profile_t = std::map; @@ -583,41 +584,45 @@ seastar::future> ShardServices::make_pg( return get_pool_info(pgid.pool()); } }; - auto get_collection = [pgid, do_create, this] { + auto get_collection = [pgid, do_create, store_index, this] { const coll_t cid{pgid}; if (do_create) { - return get_store().create_new_collection(cid); + return crimson::os::with_store<&crimson::os::FuturizedStore::Shard::create_new_collection>( + get_store(store_index), cid); } else { - return get_store().open_collection(cid); + return crimson::os::with_store<&crimson::os::FuturizedStore::Shard::open_collection>( + get_store(store_index), cid); } }; return seastar::when_all( std::move(get_pool_info_for_pg), std::move(get_collection) - ).then([pgid, create_map, this](auto &&ret) { + ).then([pgid, create_map, store_index, this](auto &&ret) { auto [pool, name, ec_profile] = std::move(std::get<0>(ret).get()); auto coll = std::move(std::get<1>(ret).get()); return seastar::make_ready_future>( new PG{ - pgid, - pg_shard_t{local_state.whoami, pgid.shard}, - std::move(coll), - std::move(pool), - std::move(name), - create_map, - *this, - ec_profile}); + pgid, + pg_shard_t{local_state.whoami, pgid.shard}, + std::move(store_index), + std::move(coll), + std::move(pool), + std::move(name), + create_map, + *this, + ec_profile}); }); } seastar::future> ShardServices::handle_pg_create_info( + unsigned int store_index, std::unique_ptr info) { return seastar::do_with( std::move(info), - [this](auto &info) + [store_index, this](auto &info) -> seastar::future> { return get_map(info->epoch).then( - [&info, this](cached_map_t startmap) + [&info, store_index, this](cached_map_t startmap) -> seastar::future, cached_map_t>> { LOG_PREFIX(ShardServices::handle_pg_create_info); const spg_t &pgid = info->pgid; @@ -659,7 +664,7 @@ seastar::future> ShardServices::handle_pg_create_info( } } return make_pg( - startmap, pgid, true + startmap, pgid, store_index, true ).then([startmap=std::move(startmap)](auto pg) mutable { return seastar::make_ready_future< std::tuple, OSDMapService::cached_map_t> @@ -716,6 +721,7 @@ ShardServices::get_or_create_pg_ret ShardServices::get_or_create_pg( PGMap::PGCreationBlockingEvent::TriggerI&& trigger, spg_t pgid, + unsigned int store_index, std::unique_ptr info) { if (info) { @@ -724,7 +730,8 @@ ShardServices::get_or_create_pg( if (!existed) { local_state.pg_map.set_creating(pgid); (void)handle_pg_create_info( - std::move(info)); + store_index, + std::move(info)); } return std::move(fut); } else { @@ -754,20 +761,19 @@ ShardServices::create_split_pg( return std::move(fut); } -seastar::future> ShardServices::load_pg(spg_t pgid) - +seastar::future> ShardServices::load_pg(spg_t pgid, unsigned int store_index) { LOG_PREFIX(OSDSingletonState::load_pg); DEBUG("{}", pgid); - return seastar::do_with(PGMeta(get_store(), pgid), [](auto& pg_meta) { + return seastar::do_with(PGMeta(get_store(store_index), pgid), [](auto& pg_meta) { return pg_meta.get_epoch(); }).then([this](epoch_t e) { return get_map(e); - }).then([pgid, this](auto&& create_map) { - return make_pg(std::move(create_map), pgid, false); - }).then([this](Ref pg) { - return pg->read_state(&get_store()).then([pg] { + }).then([pgid, store_index, this](auto&& create_map) { + return make_pg(std::move(create_map), pgid, store_index, false); + }).then([store_index, this](Ref pg) { + return pg->read_state(get_store(store_index)).then([pg] { return seastar::make_ready_future>(std::move(pg)); }); }).handle_exception([FNAME, pgid](auto ep) { @@ -778,11 +784,13 @@ seastar::future> ShardServices::load_pg(spg_t pgid) } seastar::future<> ShardServices::dispatch_context_transaction( - crimson::os::CollectionRef col, PeeringCtx &ctx) { + crimson::os::CollectionRef col, PeeringCtx &ctx, unsigned int store_index) { LOG_PREFIX(OSDSingletonState::dispatch_context_transaction); if (ctx.transaction.empty()) { DEBUG("empty transaction"); - co_await get_store().flush(col); + co_await crimson::os::with_store_do_transaction( + get_store(store_index), + col, ceph::os::Transaction{}); Context* on_commit( ceph::os::Transaction::collect_all_contexts(ctx.transaction)); if (on_commit) { @@ -792,7 +800,8 @@ seastar::future<> ShardServices::dispatch_context_transaction( } DEBUG("do_transaction ..."); - co_await get_store().do_transaction( + co_await crimson::os::with_store_do_transaction( + get_store(store_index), col, ctx.transaction.claim_and_reset()); co_return; @@ -821,17 +830,18 @@ seastar::future<> ShardServices::dispatch_context_messages( } seastar::future<> ShardServices::dispatch_context( + unsigned int store_index, crimson::os::CollectionRef col, PeeringCtx &&pctx) { return seastar::do_with( std::move(pctx), - [this, col](auto &ctx) { + [this, col, store_index](auto &ctx) { ceph_assert(col || ctx.transaction.empty()); return seastar::when_all_succeed( dispatch_context_messages( BufferedRecoveryMessages{ctx}), - col ? dispatch_context_transaction(col, ctx) : seastar::now() + col ? dispatch_context_transaction(col, ctx, store_index) : seastar::now() ).then_unpack([] { return seastar::now(); }); diff --git a/src/crimson/osd/shard_services.h b/src/crimson/osd/shard_services.h index 42146427e9d..de9c6e25b69 100644 --- a/src/crimson/osd/shard_services.h +++ b/src/crimson/osd/shard_services.h @@ -67,7 +67,7 @@ class PerShardState { #define assert_core() ceph_assert(seastar::this_shard_id() == core); const int whoami; - crimson::os::FuturizedStore::Shard &store; + std::vector stores; crimson::common::CephContext cct; OSDState &osd_state; @@ -353,6 +353,8 @@ class ShardServices : public OSDMapService { PerShardState local_state; seastar::sharded &osd_singleton_state; PGShardMapping& pg_to_shard_mapping; + seastar::sharded* s_container = nullptr; + unsigned int store_shard_nums = 0; template auto with_singleton(F &&f, Args&&... args) { @@ -463,15 +465,37 @@ public: ShardServices( seastar::sharded &osd_singleton_state, PGShardMapping& pg_to_shard_mapping, + unsigned int store_shard_nums, PSSArgs&&... args) : local_state(std::forward(args)...), osd_singleton_state(osd_singleton_state), - pg_to_shard_mapping(pg_to_shard_mapping) {} + pg_to_shard_mapping(pg_to_shard_mapping), + store_shard_nums(store_shard_nums) {} FORWARD_TO_OSD_SINGLETON(send_to_osd) - crimson::os::FuturizedStore::Shard &get_store() { - return local_state.store; + void set_container(seastar::sharded& ss) { s_container = &ss; } + + seastar::future<> get_remote_store() { + if (local_state.stores.empty()) { + return s_container->invoke_on( + seastar::this_shard_id() % store_shard_nums, + [] (auto& remote_service) { + assert(remote_service.local_state.stores.size() == 1); + auto ret = remote_service.local_state.stores[0].get_foreign(); + return std::move(ret); + }).then([this](auto&& remote_store) { + local_state.stores.emplace_back(make_local_shared_foreign(std::move(remote_store))); + return seastar::now(); + }); + } else { + return seastar::now(); + } + } + + crimson::os::FuturizedStore::StoreShardRef get_store(unsigned int store_index) { + assert(store_index < local_state.stores.size()); + return local_state.stores[store_index]; } struct shard_stats_t { @@ -481,8 +505,8 @@ public: return {get_reactor_utilization()}; } - auto create_split_pg_mapping(spg_t pgid, core_id_t core) { - return pg_to_shard_mapping.get_or_create_pg_mapping(pgid, core); + auto create_split_pg_mapping(spg_t pgid, core_id_t core, unsigned int store_index) { + return pg_to_shard_mapping.get_or_create_pg_mapping(pgid, core, store_index); } auto remove_pg(spg_t pgid) { @@ -524,8 +548,10 @@ public: seastar::future> make_pg( cached_map_t create_map, spg_t pgid, + unsigned int store_index, bool do_create); seastar::future> handle_pg_create_info( + unsigned int store_index, std::unique_ptr info); using get_or_create_pg_ertr = PGMap::wait_for_pg_ertr; @@ -533,6 +559,7 @@ public: get_or_create_pg_ret get_or_create_pg( PGMap::PGCreationBlockingEvent::TriggerI&&, spg_t pgid, + unsigned int store_index, std::unique_ptr info); using wait_for_pg_ertr = PGMap::wait_for_pg_ertr; @@ -543,11 +570,11 @@ public: PGMap::PGCreationBlockingEvent::TriggerI&& trigger, spg_t pgid); - seastar::future> load_pg(spg_t pgid); + seastar::future> load_pg(spg_t pgid, unsigned int store_index); /// Dispatch and reset ctx transaction seastar::future<> dispatch_context_transaction( - crimson::os::CollectionRef col, PeeringCtx &ctx); + crimson::os::CollectionRef col, PeeringCtx &ctx, unsigned int store_index); /// Dispatch and reset ctx messages seastar::future<> dispatch_context_messages( @@ -555,13 +582,15 @@ public: /// Dispatch ctx and dispose of context seastar::future<> dispatch_context( + unsigned int store_index, crimson::os::CollectionRef col, PeeringCtx &&ctx); /// Dispatch ctx and dispose of ctx, transaction must be empty seastar::future<> dispatch_context( + unsigned int store_index, PeeringCtx &&ctx) { - return dispatch_context({}, std::move(ctx)); + return dispatch_context(store_index, {}, std::move(ctx)); } PerShardPipeline &get_client_request_pipeline() { -- 2.39.5