#include "include/types.h"
#include "common/Formatter.h"
#include "crimson/osd/pg.h"
-#include "crimson/osd/pg_shard_manager.h"
#include "crimson/osd/osdmap_service.h"
+#include "crimson/osd/shard_services.h"
#include "crimson/osd/osd_operations/pg_advance_map.h"
#include "crimson/osd/osd_operation_external_tracking.h"
#include "osd/PeeringState.h"
namespace crimson::osd {
PGAdvanceMap::PGAdvanceMap(
- PGShardManager &shard_manager, Ref<PG> pg, epoch_t to,
+ ShardServices &shard_services, Ref<PG> pg, epoch_t to,
PeeringCtx &&rctx, bool do_init)
- : shard_manager(shard_manager), pg(pg), from(std::nullopt), to(to),
+ : shard_services(shard_services), pg(pg), to(to),
rctx(std::move(rctx)), do_init(do_init) {}
PGAdvanceMap::~PGAdvanceMap() {}
boost::make_counting_iterator(*from + 1),
boost::make_counting_iterator(to + 1),
[this](epoch_t next_epoch) {
- return shard_manager.get_shard_services().get_map(next_epoch).then(
+ return shard_services.get_map(next_epoch).then(
[this] (cached_map_t&& next_map) {
logger().debug("{}: advancing map to {}",
*this, next_map->get_epoch());
pg->handle_activate_map(rctx);
logger().debug("{}: map activated", *this);
if (do_init) {
- shard_manager.pg_created(pg->get_pgid(), pg);
- shard_manager.get_shard_services().inc_pg_num();
+ shard_services.pg_created(pg->get_pgid(), pg);
+ shard_services.inc_pg_num();
logger().info("PGAdvanceMap::start new pg {}", *pg);
}
return seastar::when_all_succeed(
pg->get_need_up_thru()
- ? shard_manager.get_shard_services().send_alive(
+ ? shard_services.send_alive(
pg->get_same_interval_since())
: seastar::now(),
- shard_manager.get_shard_services().dispatch_context(
+ shard_services.dispatch_context(
pg->get_collection_ref(),
std::move(rctx)));
}).then_unpack([this] {
logger().debug("{}: sending pg temp", *this);
- return shard_manager.get_shard_services().send_pg_temp();
+ return shard_services.send_pg_temp();
});
}).then([this, ref=std::move(ref)] {
logger().debug("{}: complete", *this);
namespace crimson::osd {
-class PGShardManager;
+class ShardServices;
class PG;
class PGAdvanceMap : public PhasedOperationT<PGAdvanceMap> {
static constexpr OperationTypeCode type = OperationTypeCode::pg_advance_map;
protected:
- PGShardManager &shard_manager;
+ ShardServices &shard_services;
Ref<PG> pg;
PipelineHandle handle;
public:
PGAdvanceMap(
- PGShardManager &shard_manager, Ref<PG> pg, epoch_t to,
+ ShardServices &shard_services, Ref<PG> pg, epoch_t to,
PeeringCtx &&rctx, bool do_init);
~PGAdvanceMap();
pg_to_core.erase(iter);
}
+ size_t get_num_pgs() const { return pg_to_core.size(); }
+
/// Map to cores in [min_core_mapping, core_mapping_limit)
PGShardMapping(core_id_t min_core_mapping, core_id_t core_mapping_limit) {
ceph_assert_always(min_core_mapping < core_mapping_limit);
core_to_num_pgs.emplace(i, 0);
}
}
+
private:
std::map<core_id_t, unsigned> core_to_num_pgs;
std::map<spg_t, core_id_t> pg_to_core;
// vim: ts=8 sw=2 smarttab
#include "crimson/osd/pg_shard_manager.h"
+#include "crimson/osd/pg.h"
+
+namespace {
+ seastar::logger& logger() {
+ return crimson::get_logger(ceph_subsys_osd);
+ }
+}
namespace crimson::osd {
shard_services(osd_singleton_state, local_state)
{}
+seastar::future<> PGShardManager::load_pgs()
+{
+ return osd_singleton_state.store.list_collections(
+ ).then([this](auto colls) {
+ return seastar::parallel_for_each(
+ colls,
+ [this](auto coll) {
+ spg_t pgid;
+ if (coll.is_pg(&pgid)) {
+ auto core = osd_singleton_state.pg_to_shard_mapping.maybe_create_pg(
+ pgid);
+ return with_remote_shard_state(
+ core,
+ [pgid](
+ PerShardState &per_shard_state,
+ ShardServices &shard_services) {
+ return shard_services.load_pg(
+ pgid
+ ).then([pgid, &per_shard_state, &shard_services](auto &&pg) {
+ logger().info("load_pgs: loaded {}", pgid);
+ per_shard_state.pg_map.pg_loaded(pgid, std::move(pg));
+ shard_services.inc_pg_num();
+ return seastar::now();
+ });
+ });
+ } else if (coll.is_temp(&pgid)) {
+ logger().warn(
+ "found temp collection on crimson osd, should be impossible: {}",
+ coll);
+ ceph_assert(0 == "temp collection on crimson osd, should be impossible");
+ return seastar::now();
+ } else {
+ logger().warn("ignoring unrecognized collection: {}", coll);
+ return seastar::now();
+ }
+ });
+ });
+}
+
+seastar::future<> PGShardManager::stop_pgs()
+{
+ return local_state.stop_pgs();
+}
+
+std::map<pg_t, pg_stat_t> PGShardManager::get_pg_stats() const
+{
+ return local_state.get_pg_stats();
+}
+
+seastar::future<> PGShardManager::broadcast_map_to_pgs(epoch_t epoch)
+{
+ return local_state.broadcast_map_to_pgs(
+ shard_services, epoch
+ ).then([this, epoch] {
+ osd_singleton_state.osdmap_gate.got_map(epoch);
+ return seastar::now();
+ });
+}
+
seastar::future<> PGShardManager::set_up_epoch(epoch_t e) {
local_state.set_up_epoch(e);
return seastar::now();
seastar::future<> set_up_epoch(epoch_t e);
- FORWARD(pg_created, pg_created, osd_singleton_state.pg_map)
- auto load_pgs() {
- return osd_singleton_state.load_pgs(shard_services);
+ template <typename F>
+ auto with_remote_shard_state(core_id_t core, F &&f) {
+ ceph_assert(core == 0);
+ auto &local_state_ref = local_state;
+ auto &shard_services_ref = shard_services;
+ return seastar::smp::submit_to(
+ core,
+ [f=std::forward<F>(f), &local_state_ref, &shard_services_ref]() mutable {
+ return std::invoke(
+ std::move(f), local_state_ref, shard_services_ref);
+ });
+ }
+
+ /// Runs opref on the appropriate core, creating the pg as necessary.
+ template <typename T>
+ seastar::future<> run_with_pg_maybe_create(
+ typename T::IRef op
+ ) {
+ ceph_assert(op->use_count() == 1);
+ auto &logger = crimson::get_logger(ceph_subsys_osd);
+ static_assert(T::can_create());
+ logger.debug("{}: can_create", *op);
+
+ auto core = osd_singleton_state.pg_to_shard_mapping.maybe_create_pg(
+ op->get_pgid());
+
+ local_state.registry.remove_from_registry(*op);
+ return with_remote_shard_state(
+ core,
+ [op=std::move(op)](
+ PerShardState &per_shard_state,
+ ShardServices &shard_services) mutable {
+ per_shard_state.registry.add_to_registry(*op);
+ auto &logger = crimson::get_logger(ceph_subsys_osd);
+ auto &opref = *op;
+ return opref.template with_blocking_event<
+ PGMap::PGCreationBlockingEvent
+ >([&shard_services, &opref](
+ auto &&trigger) {
+ return shard_services.get_or_create_pg(
+ std::move(trigger),
+ opref.get_pgid(), opref.get_epoch(),
+ std::move(opref.get_create_info()));
+ }).then([&logger, &shard_services, &opref](Ref<PG> pgref) {
+ logger.debug("{}: have_pg", opref);
+ return opref.with_pg(shard_services, pgref);
+ }).then([op=std::move(op)] {});
+ });
+ }
+
+ /// Runs opref on the appropriate core, waiting for pg as necessary
+ template <typename T>
+ seastar::future<> run_with_pg_maybe_wait(
+ typename T::IRef op
+ ) {
+ ceph_assert(op->use_count() == 1);
+ auto &logger = crimson::get_logger(ceph_subsys_osd);
+ static_assert(!T::can_create());
+ logger.debug("{}: !can_create", *op);
+
+ auto core = osd_singleton_state.pg_to_shard_mapping.maybe_create_pg(
+ op->get_pgid());
+
+ local_state.registry.remove_from_registry(*op);
+ return with_remote_shard_state(
+ core,
+ [op=std::move(op)](
+ PerShardState &per_shard_state,
+ ShardServices &shard_services) mutable {
+ per_shard_state.registry.add_to_registry(*op);
+ auto &logger = crimson::get_logger(ceph_subsys_osd);
+ auto &opref = *op;
+ return opref.template with_blocking_event<
+ PGMap::PGCreationBlockingEvent
+ >([&shard_services, &opref](
+ auto &&trigger) {
+ return shard_services.wait_for_pg(
+ std::move(trigger), opref.get_pgid());
+ }).then([&logger, &shard_services, &opref](Ref<PG> pgref) {
+ logger.debug("{}: have_pg", opref);
+ return opref.with_pg(shard_services, pgref);
+ }).then([op=std::move(op)] {});
+ });
}
- FORWARD_TO_OSD_SINGLETON(stop_pgs)
- FORWARD_CONST(get_pg_stats, get_pg_stats, osd_singleton_state)
- FORWARD_CONST(for_each_pg, for_each_pg, osd_singleton_state)
- auto get_num_pgs() const { return osd_singleton_state.pg_map.get_pgs().size(); }
+ seastar::future<> load_pgs();
+ seastar::future<> stop_pgs();
+
+ std::map<pg_t, pg_stat_t> get_pg_stats() const;
- auto broadcast_map_to_pgs(epoch_t epoch) {
- return osd_singleton_state.broadcast_map_to_pgs(
- *this, shard_services, epoch);
+ template <typename F>
+ void for_each_pg(F &&f) const {
+ for (auto &&pg: local_state.pg_map.get_pgs()) {
+ std::apply(f, pg);
+ }
+ }
+
+ auto get_num_pgs() const {
+ return osd_singleton_state.pg_to_shard_mapping.get_num_pgs();
}
+ seastar::future<> broadcast_map_to_pgs(epoch_t epoch);
+
template <typename F>
auto with_pg(spg_t pgid, F &&f) {
- return std::invoke(std::forward<F>(f), osd_singleton_state.get_pg(pgid));
+ core_id_t core = osd_singleton_state.pg_to_shard_mapping.get_pg_mapping(
+ pgid);
+ return with_remote_shard_state(
+ core,
+ [pgid, f=std::move(f)](auto &local_state, auto &local_service) mutable {
+ return std::invoke(
+ std::move(f),
+ local_state.pg_map.get_pg(pgid));
+ });
}
template <typename T, typename... Args>
std::forward<Args>(args)...);
auto &logger = crimson::get_logger(ceph_subsys_osd);
logger.debug("{}: starting {}", *op, __func__);
- auto &opref = *op;
+ auto &opref = *op;
+ auto id = op->get_id();
auto fut = opref.template enter_stage<>(
opref.get_connection_pipeline().await_active
).then([this, &opref, &logger] {
logger.debug("{}: got map {}, entering get_pg", opref, epoch);
return opref.template enter_stage<>(
opref.get_connection_pipeline().get_pg);
- }).then([this, &logger, &opref] {
+ }).then([this, &logger, &opref, op=std::move(op)]() mutable {
logger.debug("{}: in get_pg", opref);
if constexpr (T::can_create()) {
logger.debug("{}: can_create", opref);
- return opref.template with_blocking_event<
- PGMap::PGCreationBlockingEvent
- >([this, &opref](auto &&trigger) {
- std::ignore = this; // avoid clang warning
- return osd_singleton_state.get_or_create_pg(
- *this,
- shard_services,
- std::move(trigger),
- opref.get_pgid(), opref.get_epoch(),
- std::move(opref.get_create_info()));
- });
+ return run_with_pg_maybe_create<T>(std::move(op));
} else {
logger.debug("{}: !can_create", opref);
- return opref.template with_blocking_event<
- PGMap::PGCreationBlockingEvent
- >([this, &opref](auto &&trigger) {
- std::ignore = this; // avoid clang warning
- return osd_singleton_state.wait_for_pg(
- std::move(trigger), opref.get_pgid());
- });
+ return run_with_pg_maybe_wait<T>(std::move(op));
}
- }).then([this, &logger, &opref](Ref<PG> pgref) {
- logger.debug("{}: have_pg", opref);
- return opref.with_pg(get_shard_services(), pgref);
- }).then([op] { /* Retain refcount on op until completion */ });
-
- return std::make_pair(std::move(op), std::move(fut));
+ });
+ return std::make_pair(id, std::move(fut));
}
};
cct.get_perfcounters_collection()->add(recoverystate_perf);
}
+seastar::future<> PerShardState::stop_pgs()
+{
+ return seastar::parallel_for_each(
+ pg_map.get_pgs(),
+ [](auto& p) {
+ return p.second->stop();
+ });
+}
+
+std::map<pg_t, pg_stat_t> PerShardState::get_pg_stats() const
+{
+ std::map<pg_t, pg_stat_t> ret;
+ for (auto [pgid, pg] : pg_map.get_pgs()) {
+ if (pg->is_primary()) {
+ auto stats = pg->get_stats();
+ // todo: update reported_epoch,reported_seq,last_fresh
+ stats.reported_epoch = osdmap->get_epoch();
+ ret.emplace(pgid.pgid, std::move(stats));
+ }
+ }
+ return ret;
+}
+
+seastar::future<> PerShardState::broadcast_map_to_pgs(
+ ShardServices &shard_services,
+ epoch_t epoch)
+{
+ auto &pgs = pg_map.get_pgs();
+ return seastar::parallel_for_each(
+ pgs.begin(), pgs.end(),
+ [=, &shard_services](auto& pg) {
+ return shard_services.start_operation<PGAdvanceMap>(
+ shard_services,
+ pg.second, epoch,
+ PeeringCtx{}, false).second;
+ });
+}
+
+Ref<PG> PerShardState::get_pg(spg_t pgid)
+{
+ return pg_map.get_pg(pgid);
+}
+
OSDSingletonState::OSDSingletonState(
int whoami,
crimson::net::Messenger &cluster_msgr,
});
}
-seastar::future<Ref<PG>> OSDSingletonState::make_pg(
- ShardServices &shard_services,
+seastar::future<Ref<PG>> ShardServices::make_pg(
OSDMapService::cached_map_t create_map,
spg_t pgid,
bool do_create)
{
using ec_profile_t = std::map<std::string, std::string>;
- auto get_pool_info = [create_map, pgid, this] {
+ auto get_pool_info_for_pg = [create_map, pgid, this] {
if (create_map->have_pg_pool(pgid.pool())) {
pg_pool_t pi = *create_map->get_pg_pool(pgid.pool());
std::string name = create_map->get_pool_name(pgid.pool());
std::move(ec_profile)));
} else {
// pool was deleted; grab final pg_pool_t off disk.
- return get_meta_coll().load_final_pool_info(pgid.pool());
+ return get_pool_info(pgid.pool());
}
};
auto get_collection = [pgid, do_create, this] {
const coll_t cid{pgid};
if (do_create) {
- return store.create_new_collection(cid);
+ return get_store().create_new_collection(cid);
} else {
- return store.open_collection(cid);
+ return get_store().open_collection(cid);
}
};
return seastar::when_all(
- std::move(get_pool_info),
+ std::move(get_pool_info_for_pg),
std::move(get_collection)
- ).then([&shard_services, pgid, create_map, this] (auto&& ret) {
+ ).then([pgid, create_map, this](auto &&ret) {
auto [pool, name, ec_profile] = std::move(std::get<0>(ret).get0());
auto coll = std::move(std::get<1>(ret).get0());
return seastar::make_ready_future<Ref<PG>>(
new PG{
pgid,
- pg_shard_t{whoami, pgid.shard},
+ pg_shard_t{local_state.whoami, pgid.shard},
std::move(coll),
std::move(pool),
std::move(name),
create_map,
- shard_services,
+ *this,
ec_profile});
});
}
-seastar::future<Ref<PG>> OSDSingletonState::handle_pg_create_info(
- PGShardManager &shard_manager,
- ShardServices &shard_services,
+seastar::future<Ref<PG>> ShardServices::handle_pg_create_info(
std::unique_ptr<PGCreateInfo> info) {
return seastar::do_with(
std::move(info),
- [this, &shard_manager, &shard_services](auto &info)
+ [this](auto &info)
-> seastar::future<Ref<PG>> {
- return shard_services.get_map(info->epoch).then(
- [&info, &shard_services, this](OSDMapService::cached_map_t startmap)
- -> seastar::future<std::tuple<Ref<PG>, OSDMapService::cached_map_t>> {
+ return get_map(info->epoch).then(
+ [&info, this](cached_map_t startmap)
+ -> seastar::future<std::tuple<Ref<PG>, cached_map_t>> {
const spg_t &pgid = info->pgid;
if (info->by_mon) {
int64_t pool_id = pgid.pgid.pool();
- const pg_pool_t *pool = shard_services.get_map()->get_pg_pool(pool_id);
+ const pg_pool_t *pool = get_map()->get_pg_pool(pool_id);
if (!pool) {
logger().debug(
"{} ignoring pgid {}, pool dne",
std::tuple<Ref<PG>, OSDMapService::cached_map_t>
>(std::make_tuple(Ref<PG>(), startmap));
}
- ceph_assert(osdmap->require_osd_release >= ceph_release_t::octopus);
+ ceph_assert(get_map()->require_osd_release >=
+ ceph_release_t::octopus);
if (!pool->has_flag(pg_pool_t::FLAG_CREATING)) {
// this ensures we do not process old creating messages after the
// pool's initial pgs have been created (and pg are subsequently
>(std::make_tuple(Ref<PG>(), startmap));
}
}
- return make_pg(shard_services, startmap, pgid, true).then(
- [startmap=std::move(startmap)](auto pg) mutable {
- return seastar::make_ready_future<
- std::tuple<Ref<PG>, OSDMapService::cached_map_t>
- >(std::make_tuple(std::move(pg), std::move(startmap)));
- });
- }).then([this, &shard_manager, &shard_services, &info](auto&& ret)
+ return make_pg(
+ startmap, pgid, true
+ ).then([startmap=std::move(startmap)](auto pg) mutable {
+ return seastar::make_ready_future<
+ std::tuple<Ref<PG>, OSDMapService::cached_map_t>
+ >(std::make_tuple(std::move(pg), std::move(startmap)));
+ });
+ }).then([this, &info](auto &&ret)
->seastar::future<Ref<PG>> {
auto [pg, startmap] = std::move(ret);
if (!pg)
info->pgid.pgid, &up, &up_primary, &acting, &acting_primary);
int role = startmap->calc_pg_role(
- pg_shard_t(whoami, info->pgid.shard),
+ pg_shard_t(local_state.whoami, info->pgid.shard),
acting);
PeeringCtx rctx;
info->past_intervals,
rctx.transaction);
- return shard_services.start_operation<PGAdvanceMap>(
- shard_manager, pg, osdmap->get_epoch(), std::move(rctx), true
+ return start_operation<PGAdvanceMap>(
+ *this, pg, get_map()->get_epoch(), std::move(rctx), true
).second.then([pg=pg] {
- return seastar::make_ready_future<Ref<PG>>(pg);
+ return seastar::make_ready_future<Ref<PG>>(pg);
});
});
});
seastar::future<Ref<PG>>
-OSDSingletonState::get_or_create_pg(
- PGShardManager &shard_manager,
- ShardServices &shard_services,
+ShardServices::get_or_create_pg(
PGMap::PGCreationBlockingEvent::TriggerI&& trigger,
spg_t pgid,
epoch_t epoch,
std::unique_ptr<PGCreateInfo> info)
{
if (info) {
- auto [fut, creating] = pg_map.wait_for_pg(std::move(trigger), pgid);
+ auto [fut, creating] = local_state.pg_map.wait_for_pg(
+ std::move(trigger), pgid);
if (!creating) {
- pg_map.set_creating(pgid);
+ local_state.pg_map.set_creating(pgid);
(void)handle_pg_create_info(
- shard_manager, shard_services, std::move(info));
+ std::move(info));
}
return std::move(fut);
} else {
- return seastar::make_ready_future<Ref<PG>>(pg_map.get_pg(pgid));
+ return seastar::make_ready_future<Ref<PG>>(
+ local_state.pg_map.get_pg(pgid));
}
}
-seastar::future<Ref<PG>> OSDSingletonState::wait_for_pg(
+seastar::future<Ref<PG>> ShardServices::wait_for_pg(
PGMap::PGCreationBlockingEvent::TriggerI&& trigger, spg_t pgid)
{
- return pg_map.wait_for_pg(std::move(trigger), pgid).first;
-}
-
-Ref<PG> OSDSingletonState::get_pg(spg_t pgid)
-{
- return pg_map.get_pg(pgid);
+ return local_state.pg_map.wait_for_pg(std::move(trigger), pgid).first;
}
-seastar::future<> OSDSingletonState::load_pgs(
- ShardServices &shard_services)
-{
- return store.list_collections(
- ).then([this, &shard_services](auto colls) {
- return seastar::parallel_for_each(
- colls,
- [this, &shard_services](auto coll) {
- spg_t pgid;
- if (coll.is_pg(&pgid)) {
- return load_pg(
- shard_services,
- pgid
- ).then([pgid, this, &shard_services](auto &&pg) {
- logger().info("load_pgs: loaded {}", pgid);
- pg_map.pg_loaded(pgid, std::move(pg));
- shard_services.inc_pg_num();
- return seastar::now();
- });
- } else if (coll.is_temp(&pgid)) {
- logger().warn(
- "found temp collection on crimson osd, should be impossible: {}",
- coll);
- ceph_assert(0 == "temp collection on crimson osd, should be impossible");
- return seastar::now();
- } else {
- logger().warn("ignoring unrecognized collection: {}", coll);
- return seastar::now();
- }
- });
- });
-}
+seastar::future<Ref<PG>> ShardServices::load_pg(spg_t pgid)
-seastar::future<Ref<PG>> OSDSingletonState::load_pg(
- ShardServices &shard_services,
- spg_t pgid)
{
logger().debug("{}: {}", __func__, pgid);
- return seastar::do_with(PGMeta(store, pgid), [](auto& pg_meta) {
+ return seastar::do_with(PGMeta(get_store(), pgid), [](auto& pg_meta) {
return pg_meta.get_epoch();
- }).then([&shard_services](epoch_t e) {
- return shard_services.get_map(e);
- }).then([pgid, this, &shard_services] (auto&& create_map) {
- return make_pg(shard_services, std::move(create_map), pgid, false);
+ }).then([this](epoch_t e) {
+ return get_map(e);
+ }).then([pgid, this](auto&& create_map) {
+ return make_pg(std::move(create_map), pgid, false);
}).then([this](Ref<PG> pg) {
- return pg->read_state(&store).then([pg] {
+ return pg->read_state(&get_store()).then([pg] {
return seastar::make_ready_future<Ref<PG>>(std::move(pg));
});
}).handle_exception([pgid](auto ep) {
});
}
-seastar::future<> OSDSingletonState::stop_pgs()
-{
- return seastar::parallel_for_each(
- pg_map.get_pgs(),
- [](auto& p) {
- return p.second->stop();
- });
-}
-
-std::map<pg_t, pg_stat_t> OSDSingletonState::get_pg_stats() const
-{
- std::map<pg_t, pg_stat_t> ret;
- for (auto [pgid, pg] : pg_map.get_pgs()) {
- if (pg->is_primary()) {
- auto stats = pg->get_stats();
- // todo: update reported_epoch,reported_seq,last_fresh
- stats.reported_epoch = osdmap->get_epoch();
- ret.emplace(pgid.pgid, std::move(stats));
- }
- }
- return ret;
-}
-
-seastar::future<> OSDSingletonState::broadcast_map_to_pgs(
- PGShardManager &shard_manager,
- ShardServices &shard_services,
- epoch_t epoch)
-{
- auto &pgs = pg_map.get_pgs();
- return seastar::parallel_for_each(
- pgs.begin(), pgs.end(),
- [=, &shard_manager, &shard_services](auto& pg) {
- return shard_services.start_operation<PGAdvanceMap>(
- shard_manager, pg.second, epoch, PeeringCtx{}, false
- ).second;
- }).then([epoch, this] {
- osdmap_gate.got_map(epoch);
- return seastar::make_ready_future();
- });
-}
-
seastar::future<> ShardServices::dispatch_context_transaction(
crimson::os::CollectionRef col, PeeringCtx &ctx) {
if (ctx.transaction.empty()) {
return registry.stop();
}
+ // PGMap state
+ PGMap pg_map;
+
+ seastar::future<> stop_pgs();
+ std::map<pg_t, pg_stat_t> get_pg_stats() const;
+ seastar::future<> broadcast_map_to_pgs(
+ ShardServices &shard_services,
+ epoch_t epoch);
+
+ Ref<PG> get_pg(spg_t pgid);
+ template <typename F>
+ void for_each_pg(F &&f) const {
+ for (auto &pg : pg_map.get_pgs()) {
+ std::invoke(f, pg.first, pg.second);
+ }
+ }
+
template <typename T, typename... Args>
auto start_operation(Args&&... args) {
if (__builtin_expect(stopping, false)) {
return *meta_coll;
}
+ auto get_pool_info(int64_t poolid) {
+ return get_meta_coll().load_final_pool_info(poolid);
+ }
+
// global pg temp state
struct pg_temp_t {
std::vector<int> acting;
void requeue_pg_temp();
seastar::future<> send_pg_temp();
+ // TODO: add config to control mapping
+ PGShardMapping pg_to_shard_mapping{0, 1};
unsigned num_pgs = 0;
unsigned get_pg_num() const {
return num_pgs;
epoch_t e, bufferlist&& bl);
seastar::future<> store_maps(ceph::os::Transaction& t,
epoch_t start, Ref<MOSDMap> m);
-
- // PGMap state
- PGMap pg_map;
-
- seastar::future<Ref<PG>> make_pg(
- ShardServices &shard_services,
- cached_map_t create_map,
- spg_t pgid,
- bool do_create);
- seastar::future<Ref<PG>> handle_pg_create_info(
- PGShardManager &shard_manager,
- ShardServices &shard_services,
- std::unique_ptr<PGCreateInfo> info);
- seastar::future<Ref<PG>> get_or_create_pg(
- PGShardManager &shard_manager,
- ShardServices &shard_services,
- PGMap::PGCreationBlockingEvent::TriggerI&&,
- spg_t pgid,
- epoch_t epoch,
- std::unique_ptr<PGCreateInfo> info);
- seastar::future<Ref<PG>> wait_for_pg(
- PGMap::PGCreationBlockingEvent::TriggerI&&, spg_t pgid);
- Ref<PG> get_pg(spg_t pgid);
- seastar::future<> load_pgs(ShardServices &shard_services);
- seastar::future<Ref<PG>> load_pg(
- ShardServices &shard_services,
- spg_t pgid);
- seastar::future<> stop_pgs();
- std::map<pg_t, pg_stat_t> get_pg_stats() const;
- seastar::future<> broadcast_map_to_pgs(
- PGShardManager &shard_manager,
- ShardServices &shard_services,
- epoch_t epoch);
-
- template <typename F>
- void for_each_pg(F &&f) const {
- for (auto &pg : pg_map.get_pgs()) {
- std::invoke(f, pg.first, pg.second);
- }
- }
};
#define FORWARD_CONST(FROM_METHOD, TO_METHOD, TARGET) \
return *local_state.perf;
}
+ // Local PG Management
+ seastar::future<Ref<PG>> make_pg(
+ cached_map_t create_map,
+ spg_t pgid,
+ bool do_create);
+ seastar::future<Ref<PG>> handle_pg_create_info(
+ std::unique_ptr<PGCreateInfo> info);
+ seastar::future<Ref<PG>> get_or_create_pg(
+ PGMap::PGCreationBlockingEvent::TriggerI&&,
+ spg_t pgid,
+ epoch_t epoch,
+ std::unique_ptr<PGCreateInfo> info);
+ seastar::future<Ref<PG>> wait_for_pg(
+ PGMap::PGCreationBlockingEvent::TriggerI&&, spg_t pgid);
+ seastar::future<Ref<PG>> load_pg(spg_t pgid);
+
/// Dispatch and reset ctx transaction
seastar::future<> dispatch_context_transaction(
crimson::os::CollectionRef col, PeeringCtx &ctx);
});
}
+ FORWARD_TO_OSD_SINGLETON(get_pool_info)
FORWARD_TO_OSD_SINGLETON(get_pg_num)
FORWARD(with_throttle_while, with_throttle_while, local_state.throttler)
FORWARD_CONST(get_mnow, get_mnow, osd_singleton_state)
FORWARD_TO_OSD_SINGLETON(get_hb_stamps)
+ FORWARD(pg_created, pg_created, local_state.pg_map)
+
FORWARD(
maybe_get_cached_obc, maybe_get_cached_obc, local_state.obc_registry)
FORWARD(