From: chunmei liu Date: Tue, 3 Feb 2026 22:29:26 +0000 (-0800) Subject: crimson/osd/shard_services: get multiple store shards for per local state, and use... X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=f26a59e8bfd974138919a058731305425a6d875c;p=ceph-ci.git crimson/osd/shard_services: get multiple store shards for per local state, and use store index to create pg mapping Signed-off-by: chunmei liu --- diff --git a/src/crimson/osd/osd.cc b/src/crimson/osd/osd.cc index b3acb5f4c6d..e03d7c634da 100644 --- a/src/crimson/osd/osd.cc +++ b/src/crimson/osd/osd.cc @@ -190,7 +190,7 @@ seastar::future<> OSD::open_meta_coll() coll_t::meta() ).then([this, FNAME](auto ch) { DEBUG("registering metadata collection"); - pg_shard_manager.init_meta_coll(ch, store.get_sharded_store()); + pg_shard_manager.init_meta_coll(ch, store.get_backend_store(0)); return seastar::now(); }); } @@ -269,11 +269,11 @@ seastar::future OSD::open_or_create_meta_coll(FuturizedStore &store) return store.get_sharded_store().create_new_collection( coll_t::meta() ).then([&store](auto ch) { - return OSDMeta(ch, store.get_sharded_store()); + return OSDMeta(ch, store.get_backend_store(0)); }); } else { DEBUG("meta collection already exists"); - return seastar::make_ready_future(ch, store.get_sharded_store()); + return seastar::make_ready_future(ch, store.get_backend_store(0)); } }); } @@ -462,19 +462,20 @@ seastar::future<> OSD::start() startup_time = ceph::mono_clock::now(); ceph_assert(seastar::this_shard_id() == PRIMARY_CORE); DEBUG("starting store"); - return store.start().then([this] { - return pg_to_shard_mappings.start(0, seastar::smp::count + return store.start().then([this] (auto store_shard_nums) { + return pg_to_shard_mappings.start(0, seastar::smp::count, store_shard_nums ).then([this] { return osd_singleton_state.start_single( whoami, std::ref(*cluster_msgr), std::ref(*public_msgr), std::ref(*monc), std::ref(*mgrc)); }).then([this] { return osd_states.start(); - }).then([this] { + }).then([this, store_shard_nums] { ceph::mono_time startup_time = ceph::mono_clock::now(); return shard_services.start( std::ref(osd_singleton_state), std::ref(pg_to_shard_mappings), + store_shard_nums, whoami, startup_time, osd_singleton_state.local().perf, diff --git a/src/crimson/osd/osd_operations/pg_advance_map.cc b/src/crimson/osd/osd_operations/pg_advance_map.cc index e136e168ca8..e7d6a4549ca 100644 --- a/src/crimson/osd/osd_operations/pg_advance_map.cc +++ b/src/crimson/osd/osd_operations/pg_advance_map.cc @@ -168,11 +168,11 @@ seastar::future<> PGAdvanceMap::split_pg( children_pgids.insert(child_pgid); // Map each child pg ID to a core - auto core = co_await shard_services.create_split_pg_mapping(child_pgid, seastar::this_shard_id()); + auto core = co_await shard_services.create_split_pg_mapping(child_pgid, seastar::this_shard_id(), pg->get_store_index()); DEBUG(" PG {} mapped to {}", child_pgid.pgid, core); DEBUG(" {} map epoch: {}", child_pgid.pgid, pg_epoch); auto map = next_map; - auto child_pg = co_await shard_services.make_pg(std::move(map), child_pgid, true); + auto child_pg = co_await shard_services.make_pg(std::move(map), child_pgid, pg->get_store_index(), true); DEBUG(" Parent pgid: {}", pg->get_pgid()); DEBUG(" Child pgid: {}", child_pg->get_pgid()); diff --git a/src/crimson/osd/pg_shard_manager.cc b/src/crimson/osd/pg_shard_manager.cc index 3ae43d4d7ec..e9563e63b68 100644 --- a/src/crimson/osd/pg_shard_manager.cc +++ b/src/crimson/osd/pg_shard_manager.cc @@ -20,19 +20,20 @@ seastar::future<> PGShardManager::load_pgs(crimson::os::FuturizedStore& store) return seastar::parallel_for_each( colls_cores, [this](auto coll_core) { - auto[coll, shard_core] = coll_core; + auto[coll, shard_core_index] = coll_core; + auto[shard_core, store_index] = shard_core_index; spg_t pgid; if (coll.is_pg(&pgid)) { return get_pg_to_shard_mapping().get_or_create_pg_mapping( - pgid, shard_core - ).then([this, pgid] (auto core) { + pgid, shard_core, store_index + ).then([this, pgid] (auto core_store) { return this->with_remote_shard_state( - core, - [pgid]( + core_store.first, + [pgid, core_store]( PerShardState &per_shard_state, ShardServices &shard_services) { return shard_services.load_pg( - pgid + pgid, core_store.second ).then([pgid, &per_shard_state](auto &&pg) { logger().info("load_pgs: loaded {}", pgid); return pg->clear_temp_objects( diff --git a/src/crimson/osd/pg_shard_manager.h b/src/crimson/osd/pg_shard_manager.h index f0f4cbc9b25..40dd1eb7529 100644 --- a/src/crimson/osd/pg_shard_manager.h +++ b/src/crimson/osd/pg_shard_manager.h @@ -223,17 +223,19 @@ public: template seastar::future<> run_with_pg_maybe_create( typename T::IRef op, - ShardServices &target_shard_services + ShardServices &target_shard_services, + uint32_t store_index ) { static_assert(T::can_create()); auto &logger = crimson::get_logger(ceph_subsys_osd); auto &opref = *op; return opref.template with_blocking_event< PGMap::PGCreationBlockingEvent - >([&target_shard_services, &opref](auto &&trigger) { + >([&target_shard_services, &opref, store_index](auto &&trigger) { return target_shard_services.get_or_create_pg( std::move(trigger), opref.get_pgid(), + store_index, opref.get_create_info() ); }).safe_then([&logger, &target_shard_services, &opref](Ref pgref) { @@ -407,23 +409,24 @@ public: return seastar::make_exception_future<>(fut.get_exception()); } - auto core = fut.get(); + auto core_store = fut.get(); logger.debug("{}: can_create={}, target-core={}", - *op, T::can_create(), core); + *op, T::can_create(), core_store.first); return this->template with_remote_shard_state_and_op( - core, std::move(op), - [this](ShardServices &target_shard_services, + core_store.first, std::move(op), + [this, core_store](ShardServices &target_shard_services, typename T::IRef op) { auto &opref = *op; auto &logger = crimson::get_logger(ceph_subsys_osd); logger.debug("{}: entering create_or_wait_pg", opref); return opref.template enter_stage<>( opref.get_pershard_pipeline(target_shard_services).create_or_wait_pg - ).then([this, &target_shard_services, op=std::move(op)]() mutable { + ).then([this, &target_shard_services, op=std::move(op), core_store]() mutable { if constexpr (T::can_create()) { return this->template run_with_pg_maybe_create( - std::move(op), target_shard_services); + std::move(op), target_shard_services, core_store.second); } else { + (void)core_store; // silence unused capture warning return this->template run_with_pg_maybe_wait( std::move(op), target_shard_services); } @@ -469,6 +472,7 @@ public: opref, opref.get_pgid()); return seastar::now(); } + SUBDEBUG(osd, "{}: have_pg", opref); return op->with_pg( target_shard_services, pg ).finally([op] {}); diff --git a/src/crimson/osd/shard_services.cc b/src/crimson/osd/shard_services.cc index 09da0412416..f9389db0d76 100644 --- a/src/crimson/osd/shard_services.cc +++ b/src/crimson/osd/shard_services.cc @@ -40,7 +40,7 @@ PerShardState::PerShardState( crimson::os::FuturizedStore &store, OSDState &osd_state) : whoami(whoami), - store(store.get_sharded_store()), + b_store(store.get_backend_store()), osd_state(osd_state), osdmap_gate("PerShardState::osdmap_gate"), perf(perf), recoverystate_perf(recoverystate_perf), @@ -560,6 +560,7 @@ void OSDSingletonState::trim_maps(ceph::os::Transaction& t, seastar::future> ShardServices::make_pg( OSDMapService::cached_map_t create_map, spg_t pgid, + unsigned store_index, bool do_create) { using ec_profile_t = std::map; @@ -583,41 +584,45 @@ seastar::future> ShardServices::make_pg( return get_pool_info(pgid.pool()); } }; - auto get_collection = [pgid, do_create, this] { + auto get_collection = [pgid, do_create, store_index, this] { const coll_t cid{pgid}; if (do_create) { - return get_store().create_new_collection(cid); + return crimson::os::with_store<&crimson::os::FuturizedStore::Shard::create_new_collection>( + get_store(store_index), cid); } else { - return get_store().open_collection(cid); + return crimson::os::with_store<&crimson::os::FuturizedStore::Shard::open_collection>( + get_store(store_index), cid); } }; return seastar::when_all( std::move(get_pool_info_for_pg), std::move(get_collection) - ).then([pgid, create_map, this](auto &&ret) { + ).then([pgid, create_map, store_index, this](auto &&ret) { auto [pool, name, ec_profile] = std::move(std::get<0>(ret).get()); auto coll = std::move(std::get<1>(ret).get()); return seastar::make_ready_future>( new PG{ - pgid, - pg_shard_t{local_state.whoami, pgid.shard}, - std::move(coll), - std::move(pool), - std::move(name), - create_map, - *this, - ec_profile}); + pgid, + pg_shard_t{local_state.whoami, pgid.shard}, + std::move(store_index), + std::move(coll), + std::move(pool), + std::move(name), + create_map, + *this, + ec_profile}); }); } seastar::future> ShardServices::handle_pg_create_info( + uint32_t store_index, std::unique_ptr info) { return seastar::do_with( std::move(info), - [this](auto &info) + [store_index, this](auto &info) -> seastar::future> { return get_map(info->epoch).then( - [&info, this](cached_map_t startmap) + [&info, store_index, this](cached_map_t startmap) -> seastar::future, cached_map_t>> { LOG_PREFIX(ShardServices::handle_pg_create_info); const spg_t &pgid = info->pgid; @@ -659,7 +664,7 @@ seastar::future> ShardServices::handle_pg_create_info( } } return make_pg( - startmap, pgid, true + startmap, pgid, store_index, true ).then([startmap=std::move(startmap)](auto pg) mutable { return seastar::make_ready_future< std::tuple, OSDMapService::cached_map_t> @@ -716,6 +721,7 @@ ShardServices::get_or_create_pg_ret ShardServices::get_or_create_pg( PGMap::PGCreationBlockingEvent::TriggerI&& trigger, spg_t pgid, + uint32_t store_index, std::unique_ptr info) { if (info) { @@ -724,7 +730,8 @@ ShardServices::get_or_create_pg( if (!existed) { local_state.pg_map.set_creating(pgid); (void)handle_pg_create_info( - std::move(info)); + store_index, + std::move(info)); } return std::move(fut); } else { @@ -754,20 +761,19 @@ ShardServices::create_split_pg( return std::move(fut); } -seastar::future> ShardServices::load_pg(spg_t pgid) - +seastar::future> ShardServices::load_pg(spg_t pgid, uint32_t store_index) { LOG_PREFIX(OSDSingletonState::load_pg); DEBUG("{}", pgid); - return seastar::do_with(PGMeta(get_store(), pgid), [](auto& pg_meta) { + return seastar::do_with(PGMeta(get_store(store_index), pgid), [](auto& pg_meta) { return pg_meta.get_epoch(); }).then([this](epoch_t e) { return get_map(e); - }).then([pgid, this](auto&& create_map) { - return make_pg(std::move(create_map), pgid, false); - }).then([this](Ref pg) { - return pg->read_state(&get_store()).then([pg] { + }).then([pgid, store_index, this](auto&& create_map) { + return make_pg(std::move(create_map), pgid, store_index, false); + }).then([store_index, this](Ref pg) { + return pg->read_state(get_store(store_index)).then([pg] { return seastar::make_ready_future>(std::move(pg)); }); }).handle_exception([FNAME, pgid](auto ep) { @@ -778,11 +784,13 @@ seastar::future> ShardServices::load_pg(spg_t pgid) } seastar::future<> ShardServices::dispatch_context_transaction( - crimson::os::CollectionRef col, PeeringCtx &ctx) { + crimson::os::CollectionRef col, PeeringCtx &ctx, uint32_t store_index) { LOG_PREFIX(OSDSingletonState::dispatch_context_transaction); if (ctx.transaction.empty()) { DEBUG("empty transaction"); - co_await get_store().flush(col); + co_await crimson::os::with_store_do_transaction( + get_store(store_index), + col, ceph::os::Transaction{}); Context* on_commit( ceph::os::Transaction::collect_all_contexts(ctx.transaction)); if (on_commit) { @@ -792,7 +800,8 @@ seastar::future<> ShardServices::dispatch_context_transaction( } DEBUG("do_transaction ..."); - co_await get_store().do_transaction( + co_await crimson::os::with_store_do_transaction( + get_store(store_index), col, ctx.transaction.claim_and_reset()); co_return; @@ -821,17 +830,18 @@ seastar::future<> ShardServices::dispatch_context_messages( } seastar::future<> ShardServices::dispatch_context( + uint32_t store_index, crimson::os::CollectionRef col, PeeringCtx &&pctx) { return seastar::do_with( std::move(pctx), - [this, col](auto &ctx) { + [this, col, store_index](auto &ctx) { ceph_assert(col || ctx.transaction.empty()); return seastar::when_all_succeed( dispatch_context_messages( BufferedRecoveryMessages{ctx}), - col ? dispatch_context_transaction(col, ctx) : seastar::now() + col ? dispatch_context_transaction(col, ctx, store_index) : seastar::now() ).then_unpack([] { return seastar::now(); }); diff --git a/src/crimson/osd/shard_services.h b/src/crimson/osd/shard_services.h index 8689a7da49a..bb60fd97c90 100644 --- a/src/crimson/osd/shard_services.h +++ b/src/crimson/osd/shard_services.h @@ -67,7 +67,7 @@ class PerShardState { #define assert_core() ceph_assert(seastar::this_shard_id() == core); const int whoami; - crimson::os::FuturizedStore::Shard &store; + crimson::os::BackendStore b_store; crimson::common::CephContext cct; OSDState &osd_state; @@ -365,6 +365,7 @@ class ShardServices : public OSDMapService { PerShardState local_state; seastar::sharded &osd_singleton_state; PGShardMapping& pg_to_shard_mapping; + uint32_t store_shard_nums = 0; template auto with_singleton(F &&f, Args&&... args) { @@ -475,15 +476,19 @@ public: ShardServices( seastar::sharded &osd_singleton_state, PGShardMapping& pg_to_shard_mapping, + uint32_t store_shard_nums, PSSArgs&&... args) : local_state(std::forward(args)...), osd_singleton_state(osd_singleton_state), - pg_to_shard_mapping(pg_to_shard_mapping) {} + pg_to_shard_mapping(pg_to_shard_mapping), + store_shard_nums(store_shard_nums) {} FORWARD_TO_OSD_SINGLETON(send_to_osd) - crimson::os::FuturizedStore::Shard &get_store() { - return local_state.store; + crimson::os::BackendStore get_store(uint32_t store_index) { + auto store = local_state.b_store; + store.store_index = store_index; + return store; } struct shard_stats_t { @@ -493,8 +498,8 @@ public: return {get_reactor_utilization()}; } - auto create_split_pg_mapping(spg_t pgid, core_id_t core) { - return pg_to_shard_mapping.get_or_create_pg_mapping(pgid, core); + auto create_split_pg_mapping(spg_t pgid, core_id_t core, uint32_t store_index) { + return pg_to_shard_mapping.get_or_create_pg_mapping(pgid, core, store_index); } auto remove_pg(spg_t pgid) { @@ -536,8 +541,10 @@ public: seastar::future> make_pg( cached_map_t create_map, spg_t pgid, + uint32_t store_index, bool do_create); seastar::future> handle_pg_create_info( + uint32_t store_index, std::unique_ptr info); using get_or_create_pg_ertr = PGMap::wait_for_pg_ertr; @@ -545,6 +552,7 @@ public: get_or_create_pg_ret get_or_create_pg( PGMap::PGCreationBlockingEvent::TriggerI&&, spg_t pgid, + uint32_t store_index, std::unique_ptr info); using wait_for_pg_ertr = PGMap::wait_for_pg_ertr; @@ -555,11 +563,11 @@ public: PGMap::PGCreationBlockingEvent::TriggerI&& trigger, spg_t pgid); - seastar::future> load_pg(spg_t pgid); + seastar::future> load_pg(spg_t pgid, uint32_t store_index); /// Dispatch and reset ctx transaction seastar::future<> dispatch_context_transaction( - crimson::os::CollectionRef col, PeeringCtx &ctx); + crimson::os::CollectionRef col, PeeringCtx &ctx, uint32_t store_index); /// Dispatch and reset ctx messages seastar::future<> dispatch_context_messages( @@ -567,13 +575,15 @@ public: /// Dispatch ctx and dispose of context seastar::future<> dispatch_context( + uint32_t store_index, crimson::os::CollectionRef col, PeeringCtx &&ctx); /// Dispatch ctx and dispose of ctx, transaction must be empty seastar::future<> dispatch_context( + uint32_t store_index, PeeringCtx &&ctx) { - return dispatch_context({}, std::move(ctx)); + return dispatch_context(store_index, {}, std::move(ctx)); } PerShardPipeline &get_client_request_pipeline() {