From a7296e5ea23f339bb7305e8ce19c95ac184e4bba Mon Sep 17 00:00:00 2001 From: Samuel Just Date: Thu, 1 Sep 2022 23:22:59 +0000 Subject: [PATCH] crimson/osd: refactor pg management for multicore OSDSingletonState will now only be responsible for the spg_t->core mapping for pgs, the individual PerShardState's will hold local PGMap's. PG management operations are now proxied from PGShardManager to PerShardState. Subsequent patches will shard PerShardState. Signed-off-by: Samuel Just --- .../osd/osd_operations/pg_advance_map.cc | 18 +- .../osd/osd_operations/pg_advance_map.h | 6 +- src/crimson/osd/pg_map.h | 3 + src/crimson/osd/pg_shard_manager.cc | 66 ++++++ src/crimson/osd/pg_shard_manager.h | 151 +++++++++--- src/crimson/osd/shard_services.cc | 214 +++++++----------- src/crimson/osd/shard_services.h | 82 +++---- 7 files changed, 325 insertions(+), 215 deletions(-) diff --git a/src/crimson/osd/osd_operations/pg_advance_map.cc b/src/crimson/osd/osd_operations/pg_advance_map.cc index 140dd63fb7e..0bc7678642b 100644 --- a/src/crimson/osd/osd_operations/pg_advance_map.cc +++ b/src/crimson/osd/osd_operations/pg_advance_map.cc @@ -6,8 +6,8 @@ #include "include/types.h" #include "common/Formatter.h" #include "crimson/osd/pg.h" -#include "crimson/osd/pg_shard_manager.h" #include "crimson/osd/osdmap_service.h" +#include "crimson/osd/shard_services.h" #include "crimson/osd/osd_operations/pg_advance_map.h" #include "crimson/osd/osd_operation_external_tracking.h" #include "osd/PeeringState.h" @@ -21,9 +21,9 @@ namespace { namespace crimson::osd { PGAdvanceMap::PGAdvanceMap( - PGShardManager &shard_manager, Ref pg, epoch_t to, + ShardServices &shard_services, Ref pg, epoch_t to, PeeringCtx &&rctx, bool do_init) - : shard_manager(shard_manager), pg(pg), from(std::nullopt), to(to), + : shard_services(shard_services), pg(pg), to(to), rctx(std::move(rctx)), do_init(do_init) {} PGAdvanceMap::~PGAdvanceMap() {} @@ -71,7 +71,7 @@ seastar::future<> PGAdvanceMap::start() boost::make_counting_iterator(*from + 1), boost::make_counting_iterator(to + 1), [this](epoch_t next_epoch) { - return shard_manager.get_shard_services().get_map(next_epoch).then( + return shard_services.get_map(next_epoch).then( [this] (cached_map_t&& next_map) { logger().debug("{}: advancing map to {}", *this, next_map->get_epoch()); @@ -81,21 +81,21 @@ seastar::future<> PGAdvanceMap::start() pg->handle_activate_map(rctx); logger().debug("{}: map activated", *this); if (do_init) { - shard_manager.pg_created(pg->get_pgid(), pg); - shard_manager.get_shard_services().inc_pg_num(); + shard_services.pg_created(pg->get_pgid(), pg); + shard_services.inc_pg_num(); logger().info("PGAdvanceMap::start new pg {}", *pg); } return seastar::when_all_succeed( pg->get_need_up_thru() - ? shard_manager.get_shard_services().send_alive( + ? shard_services.send_alive( pg->get_same_interval_since()) : seastar::now(), - shard_manager.get_shard_services().dispatch_context( + shard_services.dispatch_context( pg->get_collection_ref(), std::move(rctx))); }).then_unpack([this] { logger().debug("{}: sending pg temp", *this); - return shard_manager.get_shard_services().send_pg_temp(); + return shard_services.send_pg_temp(); }); }).then([this, ref=std::move(ref)] { logger().debug("{}: complete", *this); diff --git a/src/crimson/osd/osd_operations/pg_advance_map.h b/src/crimson/osd/osd_operations/pg_advance_map.h index 3391dd690fb..6ae5a97bc0f 100644 --- a/src/crimson/osd/osd_operations/pg_advance_map.h +++ b/src/crimson/osd/osd_operations/pg_advance_map.h @@ -17,7 +17,7 @@ namespace ceph { namespace crimson::osd { -class PGShardManager; +class ShardServices; class PG; class PGAdvanceMap : public PhasedOperationT { @@ -25,7 +25,7 @@ public: static constexpr OperationTypeCode type = OperationTypeCode::pg_advance_map; protected: - PGShardManager &shard_manager; + ShardServices &shard_services; Ref pg; PipelineHandle handle; @@ -37,7 +37,7 @@ protected: public: PGAdvanceMap( - PGShardManager &shard_manager, Ref pg, epoch_t to, + ShardServices &shard_services, Ref pg, epoch_t to, PeeringCtx &&rctx, bool do_init); ~PGAdvanceMap(); diff --git a/src/crimson/osd/pg_map.h b/src/crimson/osd/pg_map.h index 68f85e5f556..0eabf34eb6b 100644 --- a/src/crimson/osd/pg_map.h +++ b/src/crimson/osd/pg_map.h @@ -65,6 +65,8 @@ public: pg_to_core.erase(iter); } + size_t get_num_pgs() const { return pg_to_core.size(); } + /// Map to cores in [min_core_mapping, core_mapping_limit) PGShardMapping(core_id_t min_core_mapping, core_id_t core_mapping_limit) { ceph_assert_always(min_core_mapping < core_mapping_limit); @@ -72,6 +74,7 @@ public: core_to_num_pgs.emplace(i, 0); } } + private: std::map core_to_num_pgs; std::map pg_to_core; diff --git a/src/crimson/osd/pg_shard_manager.cc b/src/crimson/osd/pg_shard_manager.cc index 8597725eaf1..5571e2d0ee1 100644 --- a/src/crimson/osd/pg_shard_manager.cc +++ b/src/crimson/osd/pg_shard_manager.cc @@ -2,6 +2,13 @@ // vim: ts=8 sw=2 smarttab #include "crimson/osd/pg_shard_manager.h" +#include "crimson/osd/pg.h" + +namespace { + seastar::logger& logger() { + return crimson::get_logger(ceph_subsys_osd); + } +} namespace crimson::osd { @@ -18,6 +25,65 @@ PGShardManager::PGShardManager( shard_services(osd_singleton_state, local_state) {} +seastar::future<> PGShardManager::load_pgs() +{ + return osd_singleton_state.store.list_collections( + ).then([this](auto colls) { + return seastar::parallel_for_each( + colls, + [this](auto coll) { + spg_t pgid; + if (coll.is_pg(&pgid)) { + auto core = osd_singleton_state.pg_to_shard_mapping.maybe_create_pg( + pgid); + return with_remote_shard_state( + core, + [pgid]( + PerShardState &per_shard_state, + ShardServices &shard_services) { + return shard_services.load_pg( + pgid + ).then([pgid, &per_shard_state, &shard_services](auto &&pg) { + logger().info("load_pgs: loaded {}", pgid); + per_shard_state.pg_map.pg_loaded(pgid, std::move(pg)); + shard_services.inc_pg_num(); + return seastar::now(); + }); + }); + } else if (coll.is_temp(&pgid)) { + logger().warn( + "found temp collection on crimson osd, should be impossible: {}", + coll); + ceph_assert(0 == "temp collection on crimson osd, should be impossible"); + return seastar::now(); + } else { + logger().warn("ignoring unrecognized collection: {}", coll); + return seastar::now(); + } + }); + }); +} + +seastar::future<> PGShardManager::stop_pgs() +{ + return local_state.stop_pgs(); +} + +std::map PGShardManager::get_pg_stats() const +{ + return local_state.get_pg_stats(); +} + +seastar::future<> PGShardManager::broadcast_map_to_pgs(epoch_t epoch) +{ + return local_state.broadcast_map_to_pgs( + shard_services, epoch + ).then([this, epoch] { + osd_singleton_state.osdmap_gate.got_map(epoch); + return seastar::now(); + }); +} + seastar::future<> PGShardManager::set_up_epoch(epoch_t e) { local_state.set_up_epoch(e); return seastar::now(); diff --git a/src/crimson/osd/pg_shard_manager.h b/src/crimson/osd/pg_shard_manager.h index e9138cd63b7..f40ea25a6f2 100644 --- a/src/crimson/osd/pg_shard_manager.h +++ b/src/crimson/osd/pg_shard_manager.h @@ -82,24 +82,120 @@ public: seastar::future<> set_up_epoch(epoch_t e); - FORWARD(pg_created, pg_created, osd_singleton_state.pg_map) - auto load_pgs() { - return osd_singleton_state.load_pgs(shard_services); + template + auto with_remote_shard_state(core_id_t core, F &&f) { + ceph_assert(core == 0); + auto &local_state_ref = local_state; + auto &shard_services_ref = shard_services; + return seastar::smp::submit_to( + core, + [f=std::forward(f), &local_state_ref, &shard_services_ref]() mutable { + return std::invoke( + std::move(f), local_state_ref, shard_services_ref); + }); + } + + /// Runs opref on the appropriate core, creating the pg as necessary. + template + seastar::future<> run_with_pg_maybe_create( + typename T::IRef op + ) { + ceph_assert(op->use_count() == 1); + auto &logger = crimson::get_logger(ceph_subsys_osd); + static_assert(T::can_create()); + logger.debug("{}: can_create", *op); + + auto core = osd_singleton_state.pg_to_shard_mapping.maybe_create_pg( + op->get_pgid()); + + local_state.registry.remove_from_registry(*op); + return with_remote_shard_state( + core, + [op=std::move(op)]( + PerShardState &per_shard_state, + ShardServices &shard_services) mutable { + per_shard_state.registry.add_to_registry(*op); + auto &logger = crimson::get_logger(ceph_subsys_osd); + auto &opref = *op; + return opref.template with_blocking_event< + PGMap::PGCreationBlockingEvent + >([&shard_services, &opref]( + auto &&trigger) { + return shard_services.get_or_create_pg( + std::move(trigger), + opref.get_pgid(), opref.get_epoch(), + std::move(opref.get_create_info())); + }).then([&logger, &shard_services, &opref](Ref pgref) { + logger.debug("{}: have_pg", opref); + return opref.with_pg(shard_services, pgref); + }).then([op=std::move(op)] {}); + }); + } + + /// Runs opref on the appropriate core, waiting for pg as necessary + template + seastar::future<> run_with_pg_maybe_wait( + typename T::IRef op + ) { + ceph_assert(op->use_count() == 1); + auto &logger = crimson::get_logger(ceph_subsys_osd); + static_assert(!T::can_create()); + logger.debug("{}: !can_create", *op); + + auto core = osd_singleton_state.pg_to_shard_mapping.maybe_create_pg( + op->get_pgid()); + + local_state.registry.remove_from_registry(*op); + return with_remote_shard_state( + core, + [op=std::move(op)]( + PerShardState &per_shard_state, + ShardServices &shard_services) mutable { + per_shard_state.registry.add_to_registry(*op); + auto &logger = crimson::get_logger(ceph_subsys_osd); + auto &opref = *op; + return opref.template with_blocking_event< + PGMap::PGCreationBlockingEvent + >([&shard_services, &opref]( + auto &&trigger) { + return shard_services.wait_for_pg( + std::move(trigger), opref.get_pgid()); + }).then([&logger, &shard_services, &opref](Ref pgref) { + logger.debug("{}: have_pg", opref); + return opref.with_pg(shard_services, pgref); + }).then([op=std::move(op)] {}); + }); } - FORWARD_TO_OSD_SINGLETON(stop_pgs) - FORWARD_CONST(get_pg_stats, get_pg_stats, osd_singleton_state) - FORWARD_CONST(for_each_pg, for_each_pg, osd_singleton_state) - auto get_num_pgs() const { return osd_singleton_state.pg_map.get_pgs().size(); } + seastar::future<> load_pgs(); + seastar::future<> stop_pgs(); + + std::map get_pg_stats() const; - auto broadcast_map_to_pgs(epoch_t epoch) { - return osd_singleton_state.broadcast_map_to_pgs( - *this, shard_services, epoch); + template + void for_each_pg(F &&f) const { + for (auto &&pg: local_state.pg_map.get_pgs()) { + std::apply(f, pg); + } + } + + auto get_num_pgs() const { + return osd_singleton_state.pg_to_shard_mapping.get_num_pgs(); } + seastar::future<> broadcast_map_to_pgs(epoch_t epoch); + template auto with_pg(spg_t pgid, F &&f) { - return std::invoke(std::forward(f), osd_singleton_state.get_pg(pgid)); + core_id_t core = osd_singleton_state.pg_to_shard_mapping.get_pg_mapping( + pgid); + return with_remote_shard_state( + core, + [pgid, f=std::move(f)](auto &local_state, auto &local_service) mutable { + return std::invoke( + std::move(f), + local_state.pg_map.get_pg(pgid)); + }); } template @@ -108,8 +204,9 @@ public: std::forward(args)...); auto &logger = crimson::get_logger(ceph_subsys_osd); logger.debug("{}: starting {}", *op, __func__); - auto &opref = *op; + auto &opref = *op; + auto id = op->get_id(); auto fut = opref.template enter_stage<>( opref.get_connection_pipeline().await_active ).then([this, &opref, &logger] { @@ -135,37 +232,17 @@ public: logger.debug("{}: got map {}, entering get_pg", opref, epoch); return opref.template enter_stage<>( opref.get_connection_pipeline().get_pg); - }).then([this, &logger, &opref] { + }).then([this, &logger, &opref, op=std::move(op)]() mutable { logger.debug("{}: in get_pg", opref); if constexpr (T::can_create()) { logger.debug("{}: can_create", opref); - return opref.template with_blocking_event< - PGMap::PGCreationBlockingEvent - >([this, &opref](auto &&trigger) { - std::ignore = this; // avoid clang warning - return osd_singleton_state.get_or_create_pg( - *this, - shard_services, - std::move(trigger), - opref.get_pgid(), opref.get_epoch(), - std::move(opref.get_create_info())); - }); + return run_with_pg_maybe_create(std::move(op)); } else { logger.debug("{}: !can_create", opref); - return opref.template with_blocking_event< - PGMap::PGCreationBlockingEvent - >([this, &opref](auto &&trigger) { - std::ignore = this; // avoid clang warning - return osd_singleton_state.wait_for_pg( - std::move(trigger), opref.get_pgid()); - }); + return run_with_pg_maybe_wait(std::move(op)); } - }).then([this, &logger, &opref](Ref pgref) { - logger.debug("{}: have_pg", opref); - return opref.with_pg(get_shard_services(), pgref); - }).then([op] { /* Retain refcount on op until completion */ }); - - return std::make_pair(std::move(op), std::move(fut)); + }); + return std::make_pair(id, std::move(fut)); } }; diff --git a/src/crimson/osd/shard_services.cc b/src/crimson/osd/shard_services.cc index 0bf4ba485d3..8975187ed3c 100644 --- a/src/crimson/osd/shard_services.cc +++ b/src/crimson/osd/shard_services.cc @@ -46,6 +46,49 @@ PerShardState::PerShardState( cct.get_perfcounters_collection()->add(recoverystate_perf); } +seastar::future<> PerShardState::stop_pgs() +{ + return seastar::parallel_for_each( + pg_map.get_pgs(), + [](auto& p) { + return p.second->stop(); + }); +} + +std::map PerShardState::get_pg_stats() const +{ + std::map ret; + for (auto [pgid, pg] : pg_map.get_pgs()) { + if (pg->is_primary()) { + auto stats = pg->get_stats(); + // todo: update reported_epoch,reported_seq,last_fresh + stats.reported_epoch = osdmap->get_epoch(); + ret.emplace(pgid.pgid, std::move(stats)); + } + } + return ret; +} + +seastar::future<> PerShardState::broadcast_map_to_pgs( + ShardServices &shard_services, + epoch_t epoch) +{ + auto &pgs = pg_map.get_pgs(); + return seastar::parallel_for_each( + pgs.begin(), pgs.end(), + [=, &shard_services](auto& pg) { + return shard_services.start_operation( + shard_services, + pg.second, epoch, + PeeringCtx{}, false).second; + }); +} + +Ref PerShardState::get_pg(spg_t pgid) +{ + return pg_map.get_pg(pgid); +} + OSDSingletonState::OSDSingletonState( int whoami, crimson::net::Messenger &cluster_msgr, @@ -371,14 +414,13 @@ seastar::future<> OSDSingletonState::store_maps(ceph::os::Transaction& t, }); } -seastar::future> OSDSingletonState::make_pg( - ShardServices &shard_services, +seastar::future> ShardServices::make_pg( OSDMapService::cached_map_t create_map, spg_t pgid, bool do_create) { using ec_profile_t = std::map; - auto get_pool_info = [create_map, pgid, this] { + auto get_pool_info_for_pg = [create_map, pgid, this] { if (create_map->have_pg_pool(pgid.pool())) { pg_pool_t pi = *create_map->get_pg_pool(pgid.pool()); std::string name = create_map->get_pool_name(pgid.pool()); @@ -395,51 +437,49 @@ seastar::future> OSDSingletonState::make_pg( std::move(ec_profile))); } else { // pool was deleted; grab final pg_pool_t off disk. - return get_meta_coll().load_final_pool_info(pgid.pool()); + return get_pool_info(pgid.pool()); } }; auto get_collection = [pgid, do_create, this] { const coll_t cid{pgid}; if (do_create) { - return store.create_new_collection(cid); + return get_store().create_new_collection(cid); } else { - return store.open_collection(cid); + return get_store().open_collection(cid); } }; return seastar::when_all( - std::move(get_pool_info), + std::move(get_pool_info_for_pg), std::move(get_collection) - ).then([&shard_services, pgid, create_map, this] (auto&& ret) { + ).then([pgid, create_map, this](auto &&ret) { auto [pool, name, ec_profile] = std::move(std::get<0>(ret).get0()); auto coll = std::move(std::get<1>(ret).get0()); return seastar::make_ready_future>( new PG{ pgid, - pg_shard_t{whoami, pgid.shard}, + pg_shard_t{local_state.whoami, pgid.shard}, std::move(coll), std::move(pool), std::move(name), create_map, - shard_services, + *this, ec_profile}); }); } -seastar::future> OSDSingletonState::handle_pg_create_info( - PGShardManager &shard_manager, - ShardServices &shard_services, +seastar::future> ShardServices::handle_pg_create_info( std::unique_ptr info) { return seastar::do_with( std::move(info), - [this, &shard_manager, &shard_services](auto &info) + [this](auto &info) -> seastar::future> { - return shard_services.get_map(info->epoch).then( - [&info, &shard_services, this](OSDMapService::cached_map_t startmap) - -> seastar::future, OSDMapService::cached_map_t>> { + return get_map(info->epoch).then( + [&info, this](cached_map_t startmap) + -> seastar::future, cached_map_t>> { const spg_t &pgid = info->pgid; if (info->by_mon) { int64_t pool_id = pgid.pgid.pool(); - const pg_pool_t *pool = shard_services.get_map()->get_pg_pool(pool_id); + const pg_pool_t *pool = get_map()->get_pg_pool(pool_id); if (!pool) { logger().debug( "{} ignoring pgid {}, pool dne", @@ -449,7 +489,8 @@ seastar::future> OSDSingletonState::handle_pg_create_info( std::tuple, OSDMapService::cached_map_t> >(std::make_tuple(Ref(), startmap)); } - ceph_assert(osdmap->require_osd_release >= ceph_release_t::octopus); + ceph_assert(get_map()->require_osd_release >= + ceph_release_t::octopus); if (!pool->has_flag(pg_pool_t::FLAG_CREATING)) { // this ensures we do not process old creating messages after the // pool's initial pgs have been created (and pg are subsequently @@ -463,13 +504,14 @@ seastar::future> OSDSingletonState::handle_pg_create_info( >(std::make_tuple(Ref(), startmap)); } } - return make_pg(shard_services, startmap, pgid, true).then( - [startmap=std::move(startmap)](auto pg) mutable { - return seastar::make_ready_future< - std::tuple, OSDMapService::cached_map_t> - >(std::make_tuple(std::move(pg), std::move(startmap))); - }); - }).then([this, &shard_manager, &shard_services, &info](auto&& ret) + return make_pg( + startmap, pgid, true + ).then([startmap=std::move(startmap)](auto pg) mutable { + return seastar::make_ready_future< + std::tuple, OSDMapService::cached_map_t> + >(std::make_tuple(std::move(pg), std::move(startmap))); + }); + }).then([this, &info](auto &&ret) ->seastar::future> { auto [pg, startmap] = std::move(ret); if (!pg) @@ -482,7 +524,7 @@ seastar::future> OSDSingletonState::handle_pg_create_info( info->pgid.pgid, &up, &up_primary, &acting, &acting_primary); int role = startmap->calc_pg_role( - pg_shard_t(whoami, info->pgid.shard), + pg_shard_t(local_state.whoami, info->pgid.shard), acting); PeeringCtx rctx; @@ -505,10 +547,10 @@ seastar::future> OSDSingletonState::handle_pg_create_info( info->past_intervals, rctx.transaction); - return shard_services.start_operation( - shard_manager, pg, osdmap->get_epoch(), std::move(rctx), true + return start_operation( + *this, pg, get_map()->get_epoch(), std::move(rctx), true ).second.then([pg=pg] { - return seastar::make_ready_future>(pg); + return seastar::make_ready_future>(pg); }); }); }); @@ -516,85 +558,46 @@ seastar::future> OSDSingletonState::handle_pg_create_info( seastar::future> -OSDSingletonState::get_or_create_pg( - PGShardManager &shard_manager, - ShardServices &shard_services, +ShardServices::get_or_create_pg( PGMap::PGCreationBlockingEvent::TriggerI&& trigger, spg_t pgid, epoch_t epoch, std::unique_ptr info) { if (info) { - auto [fut, creating] = pg_map.wait_for_pg(std::move(trigger), pgid); + auto [fut, creating] = local_state.pg_map.wait_for_pg( + std::move(trigger), pgid); if (!creating) { - pg_map.set_creating(pgid); + local_state.pg_map.set_creating(pgid); (void)handle_pg_create_info( - shard_manager, shard_services, std::move(info)); + std::move(info)); } return std::move(fut); } else { - return seastar::make_ready_future>(pg_map.get_pg(pgid)); + return seastar::make_ready_future>( + local_state.pg_map.get_pg(pgid)); } } -seastar::future> OSDSingletonState::wait_for_pg( +seastar::future> ShardServices::wait_for_pg( PGMap::PGCreationBlockingEvent::TriggerI&& trigger, spg_t pgid) { - return pg_map.wait_for_pg(std::move(trigger), pgid).first; -} - -Ref OSDSingletonState::get_pg(spg_t pgid) -{ - return pg_map.get_pg(pgid); + return local_state.pg_map.wait_for_pg(std::move(trigger), pgid).first; } -seastar::future<> OSDSingletonState::load_pgs( - ShardServices &shard_services) -{ - return store.list_collections( - ).then([this, &shard_services](auto colls) { - return seastar::parallel_for_each( - colls, - [this, &shard_services](auto coll) { - spg_t pgid; - if (coll.is_pg(&pgid)) { - return load_pg( - shard_services, - pgid - ).then([pgid, this, &shard_services](auto &&pg) { - logger().info("load_pgs: loaded {}", pgid); - pg_map.pg_loaded(pgid, std::move(pg)); - shard_services.inc_pg_num(); - return seastar::now(); - }); - } else if (coll.is_temp(&pgid)) { - logger().warn( - "found temp collection on crimson osd, should be impossible: {}", - coll); - ceph_assert(0 == "temp collection on crimson osd, should be impossible"); - return seastar::now(); - } else { - logger().warn("ignoring unrecognized collection: {}", coll); - return seastar::now(); - } - }); - }); -} +seastar::future> ShardServices::load_pg(spg_t pgid) -seastar::future> OSDSingletonState::load_pg( - ShardServices &shard_services, - spg_t pgid) { logger().debug("{}: {}", __func__, pgid); - return seastar::do_with(PGMeta(store, pgid), [](auto& pg_meta) { + return seastar::do_with(PGMeta(get_store(), pgid), [](auto& pg_meta) { return pg_meta.get_epoch(); - }).then([&shard_services](epoch_t e) { - return shard_services.get_map(e); - }).then([pgid, this, &shard_services] (auto&& create_map) { - return make_pg(shard_services, std::move(create_map), pgid, false); + }).then([this](epoch_t e) { + return get_map(e); + }).then([pgid, this](auto&& create_map) { + return make_pg(std::move(create_map), pgid, false); }).then([this](Ref pg) { - return pg->read_state(&store).then([pg] { + return pg->read_state(&get_store()).then([pg] { return seastar::make_ready_future>(std::move(pg)); }); }).handle_exception([pgid](auto ep) { @@ -604,47 +607,6 @@ seastar::future> OSDSingletonState::load_pg( }); } -seastar::future<> OSDSingletonState::stop_pgs() -{ - return seastar::parallel_for_each( - pg_map.get_pgs(), - [](auto& p) { - return p.second->stop(); - }); -} - -std::map OSDSingletonState::get_pg_stats() const -{ - std::map ret; - for (auto [pgid, pg] : pg_map.get_pgs()) { - if (pg->is_primary()) { - auto stats = pg->get_stats(); - // todo: update reported_epoch,reported_seq,last_fresh - stats.reported_epoch = osdmap->get_epoch(); - ret.emplace(pgid.pgid, std::move(stats)); - } - } - return ret; -} - -seastar::future<> OSDSingletonState::broadcast_map_to_pgs( - PGShardManager &shard_manager, - ShardServices &shard_services, - epoch_t epoch) -{ - auto &pgs = pg_map.get_pgs(); - return seastar::parallel_for_each( - pgs.begin(), pgs.end(), - [=, &shard_manager, &shard_services](auto& pg) { - return shard_services.start_operation( - shard_manager, pg.second, epoch, PeeringCtx{}, false - ).second; - }).then([epoch, this] { - osdmap_gate.got_map(epoch); - return seastar::make_ready_future(); - }); -} - seastar::future<> ShardServices::dispatch_context_transaction( crimson::os::CollectionRef col, PeeringCtx &ctx) { if (ctx.transaction.empty()) { diff --git a/src/crimson/osd/shard_services.h b/src/crimson/osd/shard_services.h index bc68f290f84..29abcd2f7a3 100644 --- a/src/crimson/osd/shard_services.h +++ b/src/crimson/osd/shard_services.h @@ -90,6 +90,23 @@ class PerShardState { return registry.stop(); } + // PGMap state + PGMap pg_map; + + seastar::future<> stop_pgs(); + std::map get_pg_stats() const; + seastar::future<> broadcast_map_to_pgs( + ShardServices &shard_services, + epoch_t epoch); + + Ref get_pg(spg_t pgid); + template + void for_each_pg(F &&f) const { + for (auto &pg : pg_map.get_pgs()) { + std::invoke(f, pg.first, pg.second); + } + } + template auto start_operation(Args&&... args) { if (__builtin_expect(stopping, false)) { @@ -173,6 +190,10 @@ public: return *meta_coll; } + auto get_pool_info(int64_t poolid) { + return get_meta_coll().load_final_pool_info(poolid); + } + // global pg temp state struct pg_temp_t { std::vector acting; @@ -188,6 +209,8 @@ public: void requeue_pg_temp(); seastar::future<> send_pg_temp(); + // TODO: add config to control mapping + PGShardMapping pg_to_shard_mapping{0, 1}; unsigned num_pgs = 0; unsigned get_pg_num() const { return num_pgs; @@ -238,46 +261,6 @@ public: epoch_t e, bufferlist&& bl); seastar::future<> store_maps(ceph::os::Transaction& t, epoch_t start, Ref m); - - // PGMap state - PGMap pg_map; - - seastar::future> make_pg( - ShardServices &shard_services, - cached_map_t create_map, - spg_t pgid, - bool do_create); - seastar::future> handle_pg_create_info( - PGShardManager &shard_manager, - ShardServices &shard_services, - std::unique_ptr info); - seastar::future> get_or_create_pg( - PGShardManager &shard_manager, - ShardServices &shard_services, - PGMap::PGCreationBlockingEvent::TriggerI&&, - spg_t pgid, - epoch_t epoch, - std::unique_ptr info); - seastar::future> wait_for_pg( - PGMap::PGCreationBlockingEvent::TriggerI&&, spg_t pgid); - Ref get_pg(spg_t pgid); - seastar::future<> load_pgs(ShardServices &shard_services); - seastar::future> load_pg( - ShardServices &shard_services, - spg_t pgid); - seastar::future<> stop_pgs(); - std::map get_pg_stats() const; - seastar::future<> broadcast_map_to_pgs( - PGShardManager &shard_manager, - ShardServices &shard_services, - epoch_t epoch); - - template - void for_each_pg(F &&f) const { - for (auto &pg : pg_map.get_pgs()) { - std::invoke(f, pg.first, pg.second); - } - } }; #define FORWARD_CONST(FROM_METHOD, TO_METHOD, TARGET) \ @@ -341,6 +324,22 @@ public: return *local_state.perf; } + // Local PG Management + seastar::future> make_pg( + cached_map_t create_map, + spg_t pgid, + bool do_create); + seastar::future> handle_pg_create_info( + std::unique_ptr info); + seastar::future> get_or_create_pg( + PGMap::PGCreationBlockingEvent::TriggerI&&, + spg_t pgid, + epoch_t epoch, + std::unique_ptr info); + seastar::future> wait_for_pg( + PGMap::PGCreationBlockingEvent::TriggerI&&, spg_t pgid); + seastar::future> load_pg(spg_t pgid); + /// Dispatch and reset ctx transaction seastar::future<> dispatch_context_transaction( crimson::os::CollectionRef col, PeeringCtx &ctx); @@ -376,6 +375,7 @@ public: }); } + FORWARD_TO_OSD_SINGLETON(get_pool_info) FORWARD_TO_OSD_SINGLETON(get_pg_num) FORWARD(with_throttle_while, with_throttle_while, local_state.throttler) @@ -392,6 +392,8 @@ public: FORWARD_CONST(get_mnow, get_mnow, osd_singleton_state) FORWARD_TO_OSD_SINGLETON(get_hb_stamps) + FORWARD(pg_created, pg_created, local_state.pg_map) + FORWARD( maybe_get_cached_obc, maybe_get_cached_obc, local_state.obc_registry) FORWARD( -- 2.39.5