From b09adae76b49c7719ea12026aae83c12dea86e97 Mon Sep 17 00:00:00 2001 From: Samuel Just Date: Fri, 8 Jul 2022 06:16:25 +0000 Subject: [PATCH] crimson/osd: move pg_map and associated state to CoreState Signed-off-by: Samuel Just --- src/crimson/admin/pg_commands.cc | 27 +- src/crimson/osd/osd.cc | 276 ++--------------- src/crimson/osd/osd.h | 31 +- .../osd/osd_operations/pg_advance_map.cc | 23 +- .../osd/osd_operations/pg_advance_map.h | 6 +- src/crimson/osd/pg_shard_manager.h | 22 ++ src/crimson/osd/shard_services.cc | 277 ++++++++++++++++++ src/crimson/osd/shard_services.h | 43 +++ 8 files changed, 407 insertions(+), 298 deletions(-) diff --git a/src/crimson/admin/pg_commands.cc b/src/crimson/admin/pg_commands.cc index dacfd515db481..033382a0db88d 100644 --- a/src/crimson/admin/pg_commands.cc +++ b/src/crimson/admin/pg_commands.cc @@ -56,16 +56,23 @@ public: return seastar::make_ready_future(tell_result_t{ -ENOENT, fmt::format("pgid '{}' does not exist", pgid_str)}); } - Ref pg = osd.get_pg(spg_id); - if (!pg) { - return seastar::make_ready_future(tell_result_t{ - -ENOENT, fmt::format("i don't have pgid '{}'", spg_id)}); - } - if (!pg->is_primary()) { - return seastar::make_ready_future(tell_result_t{ - -EAGAIN, fmt::format("not primary for pgid '{}'", spg_id)}); - } - return this->do_command(pg, cmdmap, format, std::move(input)); + return osd.get_pg_shard_manager().with_pg( + spg_id, + [this, spg_id, + cmdmap=std::move(cmdmap), + format=std::move(format), + input=std::move(input) + ](auto &&pg) mutable { + if (!pg) { + return seastar::make_ready_future(tell_result_t{ + -ENOENT, fmt::format("i don't have pgid '{}'", spg_id)}); + } + if (!pg->is_primary()) { + return seastar::make_ready_future(tell_result_t{ + -EAGAIN, fmt::format("not primary for pgid '{}'", spg_id)}); + } + return this->do_command(pg, cmdmap, format, std::move(input)); + }); } private: diff --git a/src/crimson/osd/osd.cc b/src/crimson/osd/osd.cc index bf122c017d583..28a939d24d6d8 100644 --- a/src/crimson/osd/osd.cc +++ b/src/crimson/osd/osd.cc @@ -359,7 +359,7 @@ seastar::future<> OSD::start() pg_shard_manager.got_map(map->get_epoch()); osdmap = std::move(map); bind_epoch = osdmap->get_epoch(); - return load_pgs(); + return pg_shard_manager.load_pgs(); }).then([this] { uint64_t osd_required = @@ -613,10 +613,7 @@ seastar::future<> OSD::stop() }).then([this] { return store.stop(); }).then([this] { - return seastar::parallel_for_each(pg_map.get_pgs(), - [](auto& p) { - return p.second->stop(); - }); + return pg_shard_manager.stop_pgs(); }).then([this] { return monc->stop(); }).then([this] { @@ -641,121 +638,29 @@ void OSD::dump_status(Formatter* f) const f->dump_string("state", pg_shard_manager.get_osd_state_string()); f->dump_unsigned("oldest_map", superblock.oldest_map); f->dump_unsigned("newest_map", superblock.newest_map); - f->dump_unsigned("num_pgs", pg_map.get_pgs().size()); + f->dump_unsigned("num_pgs", pg_shard_manager.get_num_pgs()); } void OSD::dump_pg_state_history(Formatter* f) const { f->open_array_section("pgs"); - for (auto [pgid, pg] : pg_map.get_pgs()) { + pg_shard_manager.for_each_pg([f](auto &pgid, auto &pg) { f->open_object_section("pg"); f->dump_stream("pg") << pgid; const auto& peering_state = pg->get_peering_state(); f->dump_string("currently", peering_state.get_current_state()); peering_state.dump_history(f); f->close_section(); - } + }); f->close_section(); } void OSD::print(std::ostream& out) const { out << "{osd." << superblock.whoami << " " - << superblock.osd_fsid << " [" << superblock.oldest_map - << "," << superblock.newest_map << "] " << pg_map.get_pgs().size() - << " pgs}"; -} - -seastar::future<> OSD::load_pgs() -{ - return store.list_collections().then([this](auto colls) { - return seastar::parallel_for_each(colls, [this](auto coll) { - spg_t pgid; - if (coll.is_pg(&pgid)) { - return load_pg(pgid).then([pgid, this](auto&& pg) { - logger().info("load_pgs: loaded {}", pgid); - pg_map.pg_loaded(pgid, std::move(pg)); - shard_services.inc_pg_num(); - return seastar::now(); - }); - } else if (coll.is_temp(&pgid)) { - // TODO: remove the collection - return seastar::now(); - } else { - logger().warn("ignoring unrecognized collection: {}", coll); - return seastar::now(); - } - }); - }); -} - -seastar::future> OSD::make_pg(OSDMapService::cached_map_t create_map, - spg_t pgid, - bool do_create) -{ - using ec_profile_t = map; - auto get_pool_info = [create_map, pgid, this] { - if (create_map->have_pg_pool(pgid.pool())) { - pg_pool_t pi = *create_map->get_pg_pool(pgid.pool()); - string name = create_map->get_pool_name(pgid.pool()); - ec_profile_t ec_profile; - if (pi.is_erasure()) { - ec_profile = create_map->get_erasure_code_profile(pi.erasure_code_profile); - } - return seastar::make_ready_future>( - std::make_tuple(std::move(pi), - std::move(name), - std::move(ec_profile))); - } else { - // pool was deleted; grab final pg_pool_t off disk. - return pg_shard_manager.get_meta_coll().load_final_pool_info(pgid.pool()); - } - }; - auto get_collection = [pgid, do_create, this] { - const coll_t cid{pgid}; - if (do_create) { - return store.create_new_collection(cid); - } else { - return store.open_collection(cid); - } - }; - return seastar::when_all( - std::move(get_pool_info), - std::move(get_collection) - ).then([pgid, create_map, this] (auto&& ret) { - auto [pool, name, ec_profile] = std::move(std::get<0>(ret).get0()); - auto coll = std::move(std::get<1>(ret).get0()); - return seastar::make_ready_future>( - new PG{pgid, - pg_shard_t{whoami, pgid.shard}, - std::move(coll), - std::move(pool), - std::move(name), - create_map, - shard_services, - ec_profile}); - }); -} - -seastar::future> OSD::load_pg(spg_t pgid) -{ - logger().debug("{}: {}", __func__, pgid); - - return seastar::do_with(PGMeta(store, pgid), [](auto& pg_meta) { - return pg_meta.get_epoch(); - }).then([this](epoch_t e) { - return pg_shard_manager.get_map(e); - }).then([pgid, this] (auto&& create_map) { - return make_pg(std::move(create_map), pgid, false); - }).then([this](Ref pg) { - return pg->read_state(&store).then([pg] { - return seastar::make_ready_future>(std::move(pg)); - }); - }).handle_exception([pgid](auto ep) { - logger().info("pg {} saw exception on load {}", pgid, ep); - ceph_abort("Could not load pg" == 0); - return seastar::make_exception_future>(ep); - }); + << superblock.osd_fsid << " [" << superblock.oldest_map + << "," << superblock.newest_map << "] " << pg_shard_manager.get_num_pgs() + << " pgs}"; } std::optional> @@ -887,18 +792,10 @@ void OSD::update_stats() MessageURef OSD::get_stats() const { - // todo: m-to-n: collect stats using map-reduce // MPGStats::had_map_for is not used since PGMonitor was removed auto m = crimson::make_message(monc->get_fsid(), osdmap->get_epoch()); m->osd_stat = osd_stat; - for (auto [pgid, pg] : pg_map.get_pgs()) { - if (pg->is_primary()) { - auto stats = pg->get_stats(); - // todo: update reported_epoch,reported_seq,last_fresh - stats.reported_epoch = osdmap->get_epoch(); - m->pg_stat.emplace(pgid.pgid, std::move(stats)); - } - } + m->pg_stat = pg_shard_manager.get_pg_stats(); return m; } @@ -921,88 +818,6 @@ bool OSD::require_mon_peer(crimson::net::Connection *conn, Ref m) return true; } -seastar::future> OSD::handle_pg_create_info( - std::unique_ptr info) { - return seastar::do_with( - std::move(info), - [this](auto &info) -> seastar::future> { - return pg_shard_manager.get_map(info->epoch).then( - [&info, this](OSDMapService::cached_map_t startmap) -> - seastar::future, OSDMapService::cached_map_t>> { - const spg_t &pgid = info->pgid; - if (info->by_mon) { - int64_t pool_id = pgid.pgid.pool(); - const pg_pool_t *pool = osdmap->get_pg_pool(pool_id); - if (!pool) { - logger().debug( - "{} ignoring pgid {}, pool dne", - __func__, - pgid); - return seastar::make_ready_future, OSDMapService::cached_map_t>>( - std::make_tuple(Ref(), startmap)); - } - ceph_assert(osdmap->require_osd_release >= ceph_release_t::octopus); - if (!pool->has_flag(pg_pool_t::FLAG_CREATING)) { - // this ensures we do not process old creating messages after the - // pool's initial pgs have been created (and pg are subsequently - // allowed to split or merge). - logger().debug( - "{} dropping {} create, pool does not have CREATING flag set", - __func__, - pgid); - return seastar::make_ready_future, OSDMapService::cached_map_t>>( - std::make_tuple(Ref(), startmap)); - } - } - return make_pg(startmap, pgid, true).then( - [startmap=std::move(startmap)](auto pg) mutable { - return seastar::make_ready_future, OSDMapService::cached_map_t>>( - std::make_tuple(std::move(pg), std::move(startmap))); - }); - }).then([this, &info](auto&& ret) -> - seastar::future> { - auto [pg, startmap] = std::move(ret); - if (!pg) - return seastar::make_ready_future>(Ref()); - const pg_pool_t* pp = startmap->get_pg_pool(info->pgid.pool()); - - int up_primary, acting_primary; - vector up, acting; - startmap->pg_to_up_acting_osds( - info->pgid.pgid, &up, &up_primary, &acting, &acting_primary); - - int role = startmap->calc_pg_role(pg_shard_t(whoami, info->pgid.shard), - acting); - - PeeringCtx rctx; - create_pg_collection( - rctx.transaction, - info->pgid, - info->pgid.get_split_bits(pp->get_pg_num())); - init_pg_ondisk( - rctx.transaction, - info->pgid, - pp); - - pg->init( - role, - up, - up_primary, - acting, - acting_primary, - info->history, - info->past_intervals, - rctx.transaction); - - return shard_services.start_operation( - *this, pg, pg->get_osdmap_epoch(), - osdmap->get_epoch(), std::move(rctx), true).second.then([pg=pg] { - return seastar::make_ready_future>(pg); - }); - }); - }); -} - seastar::future<> OSD::handle_osd_map(crimson::net::ConnectionRef conn, Ref m) { @@ -1118,7 +933,7 @@ seastar::future<> OSD::committed_osd_maps(version_t first, } return check_osdmap_features().then([this] { // yay! - return consume_map(osdmap->get_epoch()); + return pg_shard_manager.broadcast_map_to_pgs(osdmap->get_epoch()); }); }).then([m, this] { if (pg_shard_manager.is_active()) { @@ -1221,14 +1036,17 @@ seastar::future<> OSD::handle_rep_op(crimson::net::ConnectionRef conn, seastar::future<> OSD::handle_rep_op_reply(crimson::net::ConnectionRef conn, Ref m) { - const auto& pgs = pg_map.get_pgs(); - if (auto pg = pgs.find(m->get_spg()); pg != pgs.end()) { - m->finish_decode(); - pg->second->handle_rep_op_reply(conn, *m); - } else { - logger().warn("stale reply: {}", *m); - } - return seastar::now(); + return pg_shard_manager.with_pg( + m->get_spg(), + [conn=std::move(conn), m=std::move(m)](auto &&pg) { + if (pg) { + m->finish_decode(); + pg->handle_rep_op_reply(conn, *m); + } else { + logger().warn("stale reply: {}", *m); + } + return seastar::now(); + }); } seastar::future<> OSD::handle_scrub(crimson::net::ConnectionRef conn, @@ -1329,9 +1147,10 @@ void OSD::update_heartbeat_peers() if (!pg_shard_manager.is_active()) { return; } - for (auto& pg : pg_map.get_pgs()) { + + pg_shard_manager.for_each_pg([this](auto &pgid, auto &pg) { vector up, acting; - osdmap->pg_to_up_acting_osds(pg.first.pgid, + osdmap->pg_to_up_acting_osds(pgid.pgid, &up, nullptr, &acting, nullptr); for (int osd : boost::join(up, acting)) { @@ -1341,7 +1160,7 @@ void OSD::update_heartbeat_peers() heartbeat->add_peer(osd, osdmap->get_epoch()); } } - } + }); heartbeat->update_peers(whoami); } @@ -1367,51 +1186,6 @@ seastar::future<> OSD::check_osdmap_features() stringify((int)osdmap->require_osd_release)); } -seastar::future<> OSD::consume_map(epoch_t epoch) -{ - // todo: m-to-n: broadcast this news to all shards - auto &pgs = pg_map.get_pgs(); - return seastar::parallel_for_each(pgs.begin(), pgs.end(), [=](auto& pg) { - return shard_services.start_operation( - *this, pg.second, pg.second->get_osdmap_epoch(), epoch, - PeeringCtx{}, false).second; - }).then([epoch, this] { - pg_shard_manager.got_map(epoch); - return seastar::make_ready_future(); - }); -} - - -seastar::future> -OSD::get_or_create_pg( - PGMap::PGCreationBlockingEvent::TriggerI&& trigger, - spg_t pgid, - epoch_t epoch, - std::unique_ptr info) -{ - if (info) { - auto [fut, creating] = pg_map.wait_for_pg(std::move(trigger), pgid); - if (!creating) { - pg_map.set_creating(pgid); - (void)handle_pg_create_info(std::move(info)); - } - return std::move(fut); - } else { - return seastar::make_ready_future>(pg_map.get_pg(pgid)); - } -} - -seastar::future> OSD::wait_for_pg( - PGMap::PGCreationBlockingEvent::TriggerI&& trigger, spg_t pgid) -{ - return pg_map.wait_for_pg(std::move(trigger), pgid).first; -} - -Ref OSD::get_pg(spg_t pgid) -{ - return pg_map.get_pg(pgid); -} - seastar::future<> OSD::prepare_to_stop() { if (osdmap && osdmap->is_up(whoami)) { diff --git a/src/crimson/osd/osd.h b/src/crimson/osd/osd.h index b70f2ccdc9cbb..ca7e59a62afb8 100644 --- a/src/crimson/osd/osd.h +++ b/src/crimson/osd/osd.h @@ -146,12 +146,6 @@ private: seastar::future<> _send_boot(); seastar::future<> _add_me_to_crush(); - seastar::future> make_pg(OSDMapService::cached_map_t create_map, - spg_t pgid, - bool do_create); - seastar::future> load_pg(spg_t pgid); - seastar::future<> load_pgs(); - seastar::future<> osdmap_subscribe(version_t epoch, bool force_request); void write_superblock(ceph::os::Transaction& t); @@ -159,9 +153,6 @@ private: bool require_mon_peer(crimson::net::Connection *conn, Ref m); - seastar::future> handle_pg_create_info( - std::unique_ptr info); - seastar::future<> handle_osd_map(crimson::net::ConnectionRef conn, Ref m); seastar::future<> handle_osd_op(crimson::net::ConnectionRef conn, @@ -194,14 +185,14 @@ private: crimson::net::ConnectionRef conn, Ref m); public: + auto &get_pg_shard_manager() { + return pg_shard_manager; + } ShardServices &get_shard_services() { return pg_shard_manager.get_shard_services(); } - seastar::future<> consume_map(epoch_t epoch); - private: - PGMap pg_map; crimson::common::Gated gate; seastar::promise<> stop_acked; @@ -215,16 +206,7 @@ private: void update_heartbeat_peers(); friend class PGAdvanceMap; - seastar::future> get_or_create_pg( - PGMap::PGCreationBlockingEvent::TriggerI&&, - spg_t pgid, - epoch_t epoch, - std::unique_ptr info); - seastar::future> wait_for_pg( - PGMap::PGCreationBlockingEvent::TriggerI&&, spg_t pgid); - public: - Ref get_pg(spg_t pgid); seastar::future<> send_beacon(); template @@ -268,7 +250,9 @@ public: PGMap::PGCreationBlockingEvent >([this, &opref](auto &&trigger) { std::ignore = this; // avoid clang warning - return get_or_create_pg( + return pg_shard_manager.get_or_create_pg( + pg_shard_manager, + pg_shard_manager.get_shard_services(), std::move(trigger), opref.get_pgid(), opref.get_epoch(), std::move(opref.get_create_info())); @@ -279,7 +263,8 @@ public: PGMap::PGCreationBlockingEvent >([this, &opref](auto &&trigger) { std::ignore = this; // avoid clang warning - return wait_for_pg(std::move(trigger), opref.get_pgid()); + return pg_shard_manager.wait_for_pg( + std::move(trigger), opref.get_pgid()); }); } }).then([this, &logger, &opref](Ref pgref) { diff --git a/src/crimson/osd/osd_operations/pg_advance_map.cc b/src/crimson/osd/osd_operations/pg_advance_map.cc index d435bc0fe2482..51b279c7edb8b 100644 --- a/src/crimson/osd/osd_operations/pg_advance_map.cc +++ b/src/crimson/osd/osd_operations/pg_advance_map.cc @@ -7,7 +7,7 @@ #include "include/types.h" #include "common/Formatter.h" #include "crimson/osd/pg.h" -#include "crimson/osd/osd.h" +#include "crimson/osd/pg_shard_manager.h" #include "crimson/osd/osd_operations/pg_advance_map.h" #include "crimson/osd/osd_operation_external_tracking.h" #include "osd/PeeringState.h" @@ -21,9 +21,9 @@ namespace { namespace crimson::osd { PGAdvanceMap::PGAdvanceMap( - OSD &osd, Ref pg, epoch_t from, epoch_t to, + PGShardManager &shard_manager, Ref pg, epoch_t from, epoch_t to, PeeringCtx &&rctx, bool do_init) - : osd(osd), pg(pg), from(from), to(to), + : shard_manager(shard_manager), pg(pg), from(from), to(to), rctx(std::move(rctx)), do_init(do_init) {} PGAdvanceMap::~PGAdvanceMap() {} @@ -68,7 +68,7 @@ seastar::future<> PGAdvanceMap::start() boost::make_counting_iterator(from + 1), boost::make_counting_iterator(to + 1), [this](epoch_t next_epoch) { - return osd.pg_shard_manager.get_map(next_epoch).then( + return shard_manager.get_map(next_epoch).then( [this] (cached_map_t&& next_map) { pg->handle_advance_map(next_map, rctx); }); @@ -76,19 +76,20 @@ seastar::future<> PGAdvanceMap::start() pg->handle_activate_map(rctx); handle.exit(); if (do_init) { - osd.pg_map.pg_created(pg->get_pgid(), pg); - osd.shard_services.inc_pg_num(); + shard_manager.pg_created(pg->get_pgid(), pg); + shard_manager.get_shard_services().inc_pg_num(); logger().info("PGAdvanceMap::start new pg {}", *pg); } return seastar::when_all_succeed( - pg->get_need_up_thru() \ - ? osd.shard_services.send_alive(pg->get_same_interval_since()) - : seastar::now(), - osd.shard_services.dispatch_context( + pg->get_need_up_thru() + ? shard_manager.get_shard_services().send_alive( + pg->get_same_interval_since()) + : seastar::now(), + shard_manager.get_shard_services().dispatch_context( pg->get_collection_ref(), std::move(rctx))); }).then_unpack([this] { - return osd.shard_services.send_pg_temp(); + return shard_manager.get_shard_services().send_pg_temp(); }); }).then([this, ref=std::move(ref)] { logger().debug("{}: complete", *this); diff --git a/src/crimson/osd/osd_operations/pg_advance_map.h b/src/crimson/osd/osd_operations/pg_advance_map.h index 1ec23029b2d97..51789a3a80270 100644 --- a/src/crimson/osd/osd_operations/pg_advance_map.h +++ b/src/crimson/osd/osd_operations/pg_advance_map.h @@ -17,7 +17,7 @@ namespace ceph { namespace crimson::osd { -class OSD; +class PGShardManager; class PG; class PGAdvanceMap : public PhasedOperationT { @@ -25,7 +25,7 @@ public: static constexpr OperationTypeCode type = OperationTypeCode::pg_advance_map; protected: - OSD &osd; + PGShardManager &shard_manager; Ref pg; PipelineHandle handle; @@ -37,7 +37,7 @@ protected: public: PGAdvanceMap( - OSD &osd, Ref pg, epoch_t from, epoch_t to, + PGShardManager &shard_manager, Ref pg, epoch_t from, epoch_t to, PeeringCtx &&rctx, bool do_init); ~PGAdvanceMap(); diff --git a/src/crimson/osd/pg_shard_manager.h b/src/crimson/osd/pg_shard_manager.h index 6d54c6b4f3ba9..1f759ac97b7a4 100644 --- a/src/crimson/osd/pg_shard_manager.h +++ b/src/crimson/osd/pg_shard_manager.h @@ -76,6 +76,28 @@ public: FORWARD_TO_CORE(store_maps) FORWARD_TO_CORE(get_up_epoch) FORWARD_TO_CORE(set_up_epoch) + + FORWARD(pg_created, pg_created, core_state.pg_map) + auto load_pgs() { + return core_state.load_pgs(shard_services); + } + FORWARD_TO_CORE(stop_pgs) + FORWARD_CONST(get_pg_stats, get_pg_stats, core_state) + + FORWARD_TO_CORE(get_or_create_pg) + FORWARD_TO_CORE(wait_for_pg) + FORWARD_CONST(for_each_pg, for_each_pg, core_state) + auto get_num_pgs() const { return core_state.pg_map.get_pgs().size(); } + + auto broadcast_map_to_pgs(epoch_t epoch) { + return core_state.broadcast_map_to_pgs( + *this, shard_services, epoch); + } + + template + auto with_pg(spg_t pgid, F &&f) { + return std::invoke(std::forward(f), core_state.get_pg(pgid)); + } }; } diff --git a/src/crimson/osd/shard_services.cc b/src/crimson/osd/shard_services.cc index 7fcbc24729719..52dcda071b69e 100644 --- a/src/crimson/osd/shard_services.cc +++ b/src/crimson/osd/shard_services.cc @@ -19,6 +19,9 @@ #include "crimson/net/Connection.h" #include "crimson/os/cyanstore/cyan_store.h" #include "crimson/osd/osdmap_service.h" +#include "crimson/osd/osd_operations/pg_advance_map.h" +#include "crimson/osd/pg.h" +#include "crimson/osd/pg_meta.h" namespace { seastar::logger& logger() { @@ -366,6 +369,280 @@ seastar::future<> CoreState::store_maps(ceph::os::Transaction& t, }); } +seastar::future> CoreState::make_pg( + ShardServices &shard_services, + OSDMapService::cached_map_t create_map, + spg_t pgid, + bool do_create) +{ + using ec_profile_t = std::map; + auto get_pool_info = [create_map, pgid, this] { + if (create_map->have_pg_pool(pgid.pool())) { + pg_pool_t pi = *create_map->get_pg_pool(pgid.pool()); + std::string name = create_map->get_pool_name(pgid.pool()); + ec_profile_t ec_profile; + if (pi.is_erasure()) { + ec_profile = create_map->get_erasure_code_profile( + pi.erasure_code_profile); + } + return seastar::make_ready_future< + std::tuple + >(std::make_tuple( + std::move(pi), + std::move(name), + std::move(ec_profile))); + } else { + // pool was deleted; grab final pg_pool_t off disk. + return get_meta_coll().load_final_pool_info(pgid.pool()); + } + }; + auto get_collection = [pgid, do_create, this] { + const coll_t cid{pgid}; + if (do_create) { + return store.create_new_collection(cid); + } else { + return store.open_collection(cid); + } + }; + return seastar::when_all( + std::move(get_pool_info), + std::move(get_collection) + ).then([&shard_services, pgid, create_map, this] (auto&& ret) { + auto [pool, name, ec_profile] = std::move(std::get<0>(ret).get0()); + auto coll = std::move(std::get<1>(ret).get0()); + return seastar::make_ready_future>( + new PG{ + pgid, + pg_shard_t{whoami, pgid.shard}, + std::move(coll), + std::move(pool), + std::move(name), + create_map, + shard_services, + ec_profile}); + }); +} + +seastar::future> CoreState::handle_pg_create_info( + PGShardManager &shard_manager, + ShardServices &shard_services, + std::unique_ptr info) { + return seastar::do_with( + std::move(info), + [this, &shard_manager, &shard_services](auto &info) -> seastar::future> { + return get_map(info->epoch).then( + [&info, &shard_services, this]( + OSDMapService::cached_map_t startmap) -> + seastar::future, OSDMapService::cached_map_t>> { + const spg_t &pgid = info->pgid; + if (info->by_mon) { + int64_t pool_id = pgid.pgid.pool(); + const pg_pool_t *pool = get_osdmap()->get_pg_pool(pool_id); + if (!pool) { + logger().debug( + "{} ignoring pgid {}, pool dne", + __func__, + pgid); + return seastar::make_ready_future< + std::tuple, OSDMapService::cached_map_t> + >(std::make_tuple(Ref(), startmap)); + } + ceph_assert(osdmap->require_osd_release >= ceph_release_t::octopus); + if (!pool->has_flag(pg_pool_t::FLAG_CREATING)) { + // this ensures we do not process old creating messages after the + // pool's initial pgs have been created (and pg are subsequently + // allowed to split or merge). + logger().debug( + "{} dropping {} create, pool does not have CREATING flag set", + __func__, + pgid); + return seastar::make_ready_future< + std::tuple, OSDMapService::cached_map_t> + >(std::make_tuple(Ref(), startmap)); + } + } + return make_pg(shard_services, startmap, pgid, true).then( + [startmap=std::move(startmap)](auto pg) mutable { + return seastar::make_ready_future< + std::tuple, OSDMapService::cached_map_t> + >(std::make_tuple(std::move(pg), std::move(startmap))); + }); + }).then([this, &shard_manager, &shard_services, &info](auto&& ret) -> + seastar::future> { + auto [pg, startmap] = std::move(ret); + if (!pg) + return seastar::make_ready_future>(Ref()); + const pg_pool_t* pp = startmap->get_pg_pool(info->pgid.pool()); + + int up_primary, acting_primary; + vector up, acting; + startmap->pg_to_up_acting_osds( + info->pgid.pgid, &up, &up_primary, &acting, &acting_primary); + + int role = startmap->calc_pg_role( + pg_shard_t(whoami, info->pgid.shard), + acting); + + PeeringCtx rctx; + create_pg_collection( + rctx.transaction, + info->pgid, + info->pgid.get_split_bits(pp->get_pg_num())); + init_pg_ondisk( + rctx.transaction, + info->pgid, + pp); + + pg->init( + role, + up, + up_primary, + acting, + acting_primary, + info->history, + info->past_intervals, + rctx.transaction); + + return shard_services.start_operation( + shard_manager, pg, pg->get_osdmap_epoch(), + osdmap->get_epoch(), std::move(rctx), true).second.then([pg=pg] { + return seastar::make_ready_future>(pg); + }); + }); + }); +} + + +seastar::future> +CoreState::get_or_create_pg( + PGShardManager &shard_manager, + ShardServices &shard_services, + PGMap::PGCreationBlockingEvent::TriggerI&& trigger, + spg_t pgid, + epoch_t epoch, + std::unique_ptr info) +{ + if (info) { + auto [fut, creating] = pg_map.wait_for_pg(std::move(trigger), pgid); + if (!creating) { + pg_map.set_creating(pgid); + (void)handle_pg_create_info( + shard_manager, shard_services, std::move(info)); + } + return std::move(fut); + } else { + return seastar::make_ready_future>(pg_map.get_pg(pgid)); + } +} + +seastar::future> CoreState::wait_for_pg( + PGMap::PGCreationBlockingEvent::TriggerI&& trigger, spg_t pgid) +{ + return pg_map.wait_for_pg(std::move(trigger), pgid).first; +} + +Ref CoreState::get_pg(spg_t pgid) +{ + return pg_map.get_pg(pgid); +} + +seastar::future<> CoreState::load_pgs( + ShardServices &shard_services) +{ + return store.list_collections( + ).then([this, &shard_services](auto colls) { + return seastar::parallel_for_each( + colls, + [this, &shard_services](auto coll) { + spg_t pgid; + if (coll.is_pg(&pgid)) { + return load_pg( + shard_services, + pgid + ).then([pgid, this, &shard_services](auto &&pg) { + logger().info("load_pgs: loaded {}", pgid); + pg_map.pg_loaded(pgid, std::move(pg)); + shard_services.inc_pg_num(); + return seastar::now(); + }); + } else if (coll.is_temp(&pgid)) { + logger().warn( + "found temp collection on crimson osd, should be impossible: {}", + coll); + ceph_assert(0 == "temp collection on crimson osd, should be impossible"); + return seastar::now(); + } else { + logger().warn("ignoring unrecognized collection: {}", coll); + return seastar::now(); + } + }); + }); +} + +seastar::future> CoreState::load_pg( + ShardServices &shard_services, + spg_t pgid) +{ + logger().debug("{}: {}", __func__, pgid); + + return seastar::do_with(PGMeta(store, pgid), [](auto& pg_meta) { + return pg_meta.get_epoch(); + }).then([this](epoch_t e) { + return get_map(e); + }).then([pgid, this, &shard_services] (auto&& create_map) { + return make_pg(shard_services, std::move(create_map), pgid, false); + }).then([this](Ref pg) { + return pg->read_state(&store).then([pg] { + return seastar::make_ready_future>(std::move(pg)); + }); + }).handle_exception([pgid](auto ep) { + logger().info("pg {} saw exception on load {}", pgid, ep); + ceph_abort("Could not load pg" == 0); + return seastar::make_exception_future>(ep); + }); +} + +seastar::future<> CoreState::stop_pgs() +{ + return seastar::parallel_for_each( + pg_map.get_pgs(), + [](auto& p) { + return p.second->stop(); + }); +} + +std::map CoreState::get_pg_stats() const +{ + std::map ret; + for (auto [pgid, pg] : pg_map.get_pgs()) { + if (pg->is_primary()) { + auto stats = pg->get_stats(); + // todo: update reported_epoch,reported_seq,last_fresh + stats.reported_epoch = osdmap->get_epoch(); + ret.emplace(pgid.pgid, std::move(stats)); + } + } + return ret; +} + +seastar::future<> CoreState::broadcast_map_to_pgs( + PGShardManager &shard_manager, + ShardServices &shard_services, + epoch_t epoch) +{ + auto &pgs = pg_map.get_pgs(); + return seastar::parallel_for_each( + pgs.begin(), pgs.end(), + [=, &shard_manager, &shard_services](auto& pg) { + return shard_services.start_operation( + shard_manager, pg.second, pg.second->get_osdmap_epoch(), epoch, + PeeringCtx{}, false).second; + }).then([epoch, this] { + osdmap_gate.got_map(epoch); + return seastar::make_ready_future(); + }); +} + seastar::future<> ShardServices::dispatch_context_transaction( crimson::os::CollectionRef col, PeeringCtx &ctx) { if (ctx.transaction.empty()) { diff --git a/src/crimson/osd/shard_services.h b/src/crimson/osd/shard_services.h index a46b8bffaa2b5..1860c94dbf5d4 100644 --- a/src/crimson/osd/shard_services.h +++ b/src/crimson/osd/shard_services.h @@ -19,6 +19,7 @@ #include "crimson/osd/osdmap_gate.h" #include "crimson/osd/osd_meta.h" #include "crimson/osd/object_context.h" +#include "crimson/osd/pg_map.h" #include "crimson/osd/state.h" #include "common/AsyncReserver.h" @@ -44,6 +45,8 @@ class BufferedRecoveryMessages; namespace crimson::osd { +class PGShardManager; + /** * PerShardState * @@ -237,6 +240,46 @@ class CoreState : public md_config_obs_t, public OSDMapService { epoch_t e, bufferlist&& bl); seastar::future<> store_maps(ceph::os::Transaction& t, epoch_t start, Ref m); + + // PGMap state + PGMap pg_map; + + seastar::future> make_pg( + ShardServices &shard_services, + cached_map_t create_map, + spg_t pgid, + bool do_create); + seastar::future> handle_pg_create_info( + PGShardManager &shard_manager, + ShardServices &shard_services, + std::unique_ptr info); + seastar::future> get_or_create_pg( + PGShardManager &shard_manager, + ShardServices &shard_services, + PGMap::PGCreationBlockingEvent::TriggerI&&, + spg_t pgid, + epoch_t epoch, + std::unique_ptr info); + seastar::future> wait_for_pg( + PGMap::PGCreationBlockingEvent::TriggerI&&, spg_t pgid); + Ref get_pg(spg_t pgid); + seastar::future<> load_pgs(ShardServices &shard_services); + seastar::future> load_pg( + ShardServices &shard_services, + spg_t pgid); + seastar::future<> stop_pgs(); + std::map get_pg_stats() const; + seastar::future<> broadcast_map_to_pgs( + PGShardManager &shard_manager, + ShardServices &shard_services, + epoch_t epoch); + + template + void for_each_pg(F &&f) const { + for (auto &pg : pg_map.get_pgs()) { + std::invoke(f, pg.first, pg.second); + } + } }; #define FORWARD_CONST(FROM_METHOD, TO_METHOD, TARGET) \ -- 2.39.5