#include "crimson/osd/osd_operations/pg_advance_map.h"
#include "crimson/osd/osd_operation_external_tracking.h"
#include <boost/iterator/counting_iterator.hpp>
+#include <seastar/coroutine/parallel_for_each.hh>
#include "osd/PeeringState.h"
namespace {
logger().debug("{}: advancing map to {}",
*this, next_map->get_epoch());
pg->handle_advance_map(next_map, rctx);
- return seastar::now();
+ return check_for_splits(*from, next_map);
});
}).then([this] {
pg->handle_activate_map(rctx);
});
}
+seastar::future<> PGAdvanceMap::check_for_splits(
+ epoch_t old_epoch,
+ cached_map_t next_map)
+{
+ using cached_map_t = OSDMapService::cached_map_t;
+ cached_map_t old_map = co_await shard_services.get_map(old_epoch);
+ if (!old_map->have_pg_pool(pg->get_pgid().pool())) {
+ logger().debug("{} pool doesn't exist in epoch {}", pg->get_pgid(),
+ old_epoch);
+ co_return;
+ }
+ auto old_pg_num = old_map->get_pg_num(pg->get_pgid().pool());
+ if (!next_map->have_pg_pool(pg->get_pgid().pool())) {
+ logger().debug("{} pool doesn't exist in epoch {}", pg->get_pgid(),
+ next_map->get_epoch());
+ co_return;
+ }
+ auto new_pg_num = next_map->get_pg_num(pg->get_pgid().pool());
+ logger().debug(" pg_num change in e{} {} -> {}", next_map->get_epoch(),
+ old_pg_num, new_pg_num);
+ std::set<spg_t> children;
+ if (new_pg_num && new_pg_num > old_pg_num) {
+ if (pg->pgid.is_split(
+ old_pg_num,
+ new_pg_num,
+ &children)) {
+ co_await split_pg(children, next_map);
+ }
+ }
+ co_return;
+}
+
+
+seastar::future<> PGAdvanceMap::split_pg(
+ std::set<spg_t> split_children,
+ cached_map_t next_map)
+{
+ logger().debug("{}: start", *this);
+ auto pg_epoch = next_map->get_epoch();
+ logger().debug("{}: epoch: {}", *this, pg_epoch);
+
+ co_await seastar::coroutine::parallel_for_each(split_children, [this, &next_map,
+ pg_epoch] (auto child_pgid) -> seastar::future<> {
+ children_pgids.insert(child_pgid);
+
+ // Map each child pg ID to a core
+ auto core = co_await shard_services.create_split_pg_mapping(child_pgid, seastar::this_shard_id());
+ logger().debug(" PG {} mapped to {}", child_pgid.pgid, core);
+ logger().debug(" {} map epoch: {}", child_pgid.pgid, pg_epoch);
+ auto map = next_map;
+ auto child_pg = co_await shard_services.make_pg(std::move(map), child_pgid, true);
+
+ logger().debug(" Parent PG: {}", pg->get_pgid());
+ logger().debug(" Child PG ID: {}", child_pg->get_pgid());
+ unsigned new_pg_num = next_map->get_pg_num(pg->get_pgid().pool());
+ // Depending on the new_pg_num the parent PG's collection is split.
+ // The child PG will be initiated with this split collection.
+ unsigned split_bits = child_pg->get_pgid().get_split_bits(new_pg_num);
+ logger().debug(" pg num is {}, m_seed is {}, split bits is {}",
+ new_pg_num, child_pg->get_pgid().ps(), split_bits);
+
+ co_await pg->split_colls(child_pg->get_pgid(), split_bits, child_pg->get_pgid().ps(),
+ &child_pg->get_pgpool().info, rctx.transaction);
+ logger().debug(" {} split collection done", child_pg->get_pgid());
+ // Update the child PG's info from the parent PG
+ pg->split_into(child_pg->get_pgid().pgid, child_pg, split_bits);
+
+ co_await handle_split_pg_creation(child_pg, next_map);
+ split_pgs.insert(child_pg);
+ });
+
+ split_stats(split_pgs, children_pgids);
+ co_return;
+}
+
+seastar::future<> PGAdvanceMap::handle_split_pg_creation(
+ Ref<PG> child_pg,
+ cached_map_t next_map)
+{
+ // We must create a new Trigger instance for each pg.
+ // The BlockingEvent object which tracks whether a pg creation is complete
+ // or still blocking, shouldn't be used across multiple pgs so we can track
+ // each splitted pg creation separately.
+ using EventT = PGMap::PGCreationBlockingEvent;
+ using TriggerT = EventT::Trigger<PGAdvanceMap>;
+ EventT event;
+ TriggerT trigger(event, *this);
+ // create_split_pg waits for the pg to be created.
+ // The completion of the creation depends on running PGAdvanceMap operation on that pg
+ // Therefore, we must invoke PGAdvanceMap before awaiting the future returned by create_split_pg.
+ // Otherwise, we would be waiting indefinitely, as the pg creation
+ // cannot complete without PGAdvanceMap being executed.
+ // See: ShardServices::get_or_create_pg for similar dependency
+ // when calling handle_pg_create_info before returning a pg creation future.
+ auto fut = shard_services.create_split_pg(std::move(trigger),
+ child_pg->get_pgid()).handle_error(crimson::ct_error::ecanceled::handle([](auto) {
+ return seastar::make_ready_future<Ref<PG>>();
+ }));
+ co_await shard_services.start_operation<PGAdvanceMap>(
+ child_pg, shard_services, next_map->get_epoch(),
+ std::move(rctx), true).second;
+ co_await std::move(fut);
+}
+
+
+void PGAdvanceMap::split_stats(std::set<Ref<PG>> children_pgs,
+ const std::set<spg_t> &children_pgids)
+{
+ std::vector<object_stat_sum_t> updated_stats;
+ pg->start_split_stats(children_pgids, &updated_stats);
+ std::vector<object_stat_sum_t>::iterator stat_iter = updated_stats.begin();
+ for (std::set<Ref<PG>>::const_iterator iter = children_pgs.begin();
+ iter != children_pgs.end();
+ ++iter, ++stat_iter) {
+ (*iter)->finish_split_stats(*stat_iter, rctx.transaction);
+ }
+ pg->finish_split_stats(*stat_iter, rctx.transaction);
+}
+
+
}
#include "crimson/osd/osd_operation.h"
#include "crimson/osd/osd_operations/peering_event.h"
+#include "crimson/osd/pg_map.h"
+#include "crimson/osd/osdmap_service.h"
#include "osd/osd_types.h"
#include "crimson/common/type_helpers.h"
PeeringCtx rctx;
const bool do_init;
+ // For splitting
+ std::set<spg_t> children_pgids;
+ std::set<Ref<PG>> split_pgs;
+
public:
PGAdvanceMap(
Ref<PG> pg, ShardServices &shard_services, epoch_t to,
seastar::future<> start();
PipelineHandle &get_handle() { return handle; }
+ using cached_map_t = OSDMapService::cached_map_t;
+ seastar::future<> check_for_splits(epoch_t old_epoch,
+ cached_map_t next_map);
+ seastar::future<> split_pg(std::set<spg_t> split_children,
+ cached_map_t next_map);
+ void split_stats(std::set<Ref<PG>> child_pgs,
+ const std::set<spg_t> &child_pgids);
+
std::tuple<
- PGPeeringPipeline::Process::BlockingEvent
+ PGPeeringPipeline::Process::BlockingEvent,
+ PGMap::PGCreationBlockingEvent
> tracking_events;
epoch_t get_epoch_sent_at() const {
private:
PGPeeringPipeline &peering_pp(PG &pg);
+ seastar::future<> handle_split_pg_creation(
+ Ref<PG> child_pg,
+ cached_map_t next_map);
};
}