From 7c19667d852211d27dba7a48cc8aa45af47ec45b Mon Sep 17 00:00:00 2001 From: Aishwarya Mathuria Date: Wed, 23 Apr 2025 13:21:23 +0000 Subject: [PATCH] crimson/osd/osd_operations/pg_advance_map: Add splitting as a function As we initiate pg splitting as part of the PGAdvanceMap workflow, it is not required to maintain it as a separate osd_operation. A new function in PGAdvanceMap - split_pg(), will now take care of the splitting workflow if we detect split children in an OSD map. Since we do not follow the same queuing system as classical OSD in crimson, we will not need to maintain pg_num_history. This makes the splitting check simpler. With most of the splitting code being part of PGAdvanceMap, it makes sense to have the splitting check there as well and leave broadcast_map_to_pgs untouched. Signed-off-by: Aishwarya Mathuria --- .../osd/osd_operations/pg_advance_map.cc | 123 +++++++++++++++++- .../osd/osd_operations/pg_advance_map.h | 20 ++- src/crimson/osd/shard_services.cc | 12 +- src/crimson/osd/shard_services.h | 1 - 4 files changed, 148 insertions(+), 8 deletions(-) diff --git a/src/crimson/osd/osd_operations/pg_advance_map.cc b/src/crimson/osd/osd_operations/pg_advance_map.cc index c8fa56adb1a8b..04c0e97ac4e83 100644 --- a/src/crimson/osd/osd_operations/pg_advance_map.cc +++ b/src/crimson/osd/osd_operations/pg_advance_map.cc @@ -11,6 +11,7 @@ #include "crimson/osd/osd_operations/pg_advance_map.h" #include "crimson/osd/osd_operation_external_tracking.h" #include +#include #include "osd/PeeringState.h" namespace { @@ -97,7 +98,7 @@ seastar::future<> PGAdvanceMap::start() logger().debug("{}: advancing map to {}", *this, next_map->get_epoch()); pg->handle_advance_map(next_map, rctx); - return seastar::now(); + return check_for_splits(*from, next_map); }); }).then([this] { pg->handle_activate_map(rctx); @@ -117,4 +118,124 @@ seastar::future<> PGAdvanceMap::start() }); } +seastar::future<> PGAdvanceMap::check_for_splits( + epoch_t old_epoch, + cached_map_t next_map) +{ + using cached_map_t = OSDMapService::cached_map_t; + cached_map_t old_map = co_await shard_services.get_map(old_epoch); + if (!old_map->have_pg_pool(pg->get_pgid().pool())) { + logger().debug("{} pool doesn't exist in epoch {}", pg->get_pgid(), + old_epoch); + co_return; + } + auto old_pg_num = old_map->get_pg_num(pg->get_pgid().pool()); + if (!next_map->have_pg_pool(pg->get_pgid().pool())) { + logger().debug("{} pool doesn't exist in epoch {}", pg->get_pgid(), + next_map->get_epoch()); + co_return; + } + auto new_pg_num = next_map->get_pg_num(pg->get_pgid().pool()); + logger().debug(" pg_num change in e{} {} -> {}", next_map->get_epoch(), + old_pg_num, new_pg_num); + std::set children; + if (new_pg_num && new_pg_num > old_pg_num) { + if (pg->pgid.is_split( + old_pg_num, + new_pg_num, + &children)) { + co_await split_pg(children, next_map); + } + } + co_return; +} + + +seastar::future<> PGAdvanceMap::split_pg( + std::set split_children, + cached_map_t next_map) +{ + logger().debug("{}: start", *this); + auto pg_epoch = next_map->get_epoch(); + logger().debug("{}: epoch: {}", *this, pg_epoch); + + co_await seastar::coroutine::parallel_for_each(split_children, [this, &next_map, + pg_epoch] (auto child_pgid) -> seastar::future<> { + children_pgids.insert(child_pgid); + + // Map each child pg ID to a core + auto core = co_await shard_services.create_split_pg_mapping(child_pgid, seastar::this_shard_id()); + logger().debug(" PG {} mapped to {}", child_pgid.pgid, core); + logger().debug(" {} map epoch: {}", child_pgid.pgid, pg_epoch); + auto map = next_map; + auto child_pg = co_await shard_services.make_pg(std::move(map), child_pgid, true); + + logger().debug(" Parent PG: {}", pg->get_pgid()); + logger().debug(" Child PG ID: {}", child_pg->get_pgid()); + unsigned new_pg_num = next_map->get_pg_num(pg->get_pgid().pool()); + // Depending on the new_pg_num the parent PG's collection is split. + // The child PG will be initiated with this split collection. + unsigned split_bits = child_pg->get_pgid().get_split_bits(new_pg_num); + logger().debug(" pg num is {}, m_seed is {}, split bits is {}", + new_pg_num, child_pg->get_pgid().ps(), split_bits); + + co_await pg->split_colls(child_pg->get_pgid(), split_bits, child_pg->get_pgid().ps(), + &child_pg->get_pgpool().info, rctx.transaction); + logger().debug(" {} split collection done", child_pg->get_pgid()); + // Update the child PG's info from the parent PG + pg->split_into(child_pg->get_pgid().pgid, child_pg, split_bits); + + co_await handle_split_pg_creation(child_pg, next_map); + split_pgs.insert(child_pg); + }); + + split_stats(split_pgs, children_pgids); + co_return; +} + +seastar::future<> PGAdvanceMap::handle_split_pg_creation( + Ref child_pg, + cached_map_t next_map) +{ + // We must create a new Trigger instance for each pg. + // The BlockingEvent object which tracks whether a pg creation is complete + // or still blocking, shouldn't be used across multiple pgs so we can track + // each splitted pg creation separately. + using EventT = PGMap::PGCreationBlockingEvent; + using TriggerT = EventT::Trigger; + EventT event; + TriggerT trigger(event, *this); + // create_split_pg waits for the pg to be created. + // The completion of the creation depends on running PGAdvanceMap operation on that pg + // Therefore, we must invoke PGAdvanceMap before awaiting the future returned by create_split_pg. + // Otherwise, we would be waiting indefinitely, as the pg creation + // cannot complete without PGAdvanceMap being executed. + // See: ShardServices::get_or_create_pg for similar dependency + // when calling handle_pg_create_info before returning a pg creation future. + auto fut = shard_services.create_split_pg(std::move(trigger), + child_pg->get_pgid()).handle_error(crimson::ct_error::ecanceled::handle([](auto) { + return seastar::make_ready_future>(); + })); + co_await shard_services.start_operation( + child_pg, shard_services, next_map->get_epoch(), + std::move(rctx), true).second; + co_await std::move(fut); +} + + +void PGAdvanceMap::split_stats(std::set> children_pgs, + const std::set &children_pgids) +{ + std::vector updated_stats; + pg->start_split_stats(children_pgids, &updated_stats); + std::vector::iterator stat_iter = updated_stats.begin(); + for (std::set>::const_iterator iter = children_pgs.begin(); + iter != children_pgs.end(); + ++iter, ++stat_iter) { + (*iter)->finish_split_stats(*stat_iter, rctx.transaction); + } + pg->finish_split_stats(*stat_iter, rctx.transaction); +} + + } diff --git a/src/crimson/osd/osd_operations/pg_advance_map.h b/src/crimson/osd/osd_operations/pg_advance_map.h index 21702f6ff4f76..9d06e646c3d50 100644 --- a/src/crimson/osd/osd_operations/pg_advance_map.h +++ b/src/crimson/osd/osd_operations/pg_advance_map.h @@ -8,6 +8,8 @@ #include "crimson/osd/osd_operation.h" #include "crimson/osd/osd_operations/peering_event.h" +#include "crimson/osd/pg_map.h" +#include "crimson/osd/osdmap_service.h" #include "osd/osd_types.h" #include "crimson/common/type_helpers.h" @@ -35,6 +37,10 @@ protected: PeeringCtx rctx; const bool do_init; + // For splitting + std::set children_pgids; + std::set> split_pgs; + public: PGAdvanceMap( Ref pg, ShardServices &shard_services, epoch_t to, @@ -46,8 +52,17 @@ public: seastar::future<> start(); PipelineHandle &get_handle() { return handle; } + using cached_map_t = OSDMapService::cached_map_t; + seastar::future<> check_for_splits(epoch_t old_epoch, + cached_map_t next_map); + seastar::future<> split_pg(std::set split_children, + cached_map_t next_map); + void split_stats(std::set> child_pgs, + const std::set &child_pgids); + std::tuple< - PGPeeringPipeline::Process::BlockingEvent + PGPeeringPipeline::Process::BlockingEvent, + PGMap::PGCreationBlockingEvent > tracking_events; epoch_t get_epoch_sent_at() const { @@ -56,6 +71,9 @@ public: private: PGPeeringPipeline &peering_pp(PG &pg); + seastar::future<> handle_split_pg_creation( + Ref child_pg, + cached_map_t next_map); }; } diff --git a/src/crimson/osd/shard_services.cc b/src/crimson/osd/shard_services.cc index 1ca67547584ca..0058b4462f414 100644 --- a/src/crimson/osd/shard_services.cc +++ b/src/crimson/osd/shard_services.cc @@ -94,7 +94,7 @@ seastar::future<> PerShardState::broadcast_map_to_pgs( auto &pgs = pg_map.get_pgs(); return seastar::parallel_for_each( pgs.begin(), pgs.end(), - [this, &shard_services, epoch](auto& pg) { + [&shard_services, epoch](auto& pg) { return shard_services.start_operation( pg.second, shard_services, @@ -744,10 +744,12 @@ ShardServices::create_split_pg( PGMap::PGCreationBlockingEvent::TriggerI&& trigger, spg_t pgid) { - auto fut = local_state.pg_map.wait_for_pg( - std::move(trigger), pgid).first; - local_state.pg_map.set_creating(pgid); - return fut; + auto [fut, existed] = local_state.pg_map.wait_for_pg( + std::move(trigger), pgid); + if (!existed) { + local_state.pg_map.set_creating(pgid); + } + return std::move(fut); } seastar::future> ShardServices::load_pg(spg_t pgid) diff --git a/src/crimson/osd/shard_services.h b/src/crimson/osd/shard_services.h index e9ccc74546fb8..f5a1f2c74ef30 100644 --- a/src/crimson/osd/shard_services.h +++ b/src/crimson/osd/shard_services.h @@ -127,7 +127,6 @@ class PerShardState { ShardServices &shard_services, epoch_t epoch); - Ref get_pg(spg_t pgid); template void for_each_pg(F &&f) const { -- 2.39.5