seastar::future<> PGShardManager::load_pgs()
{
+ ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
return get_local_state().store.list_collections(
).then([this](auto colls) {
return seastar::parallel_for_each(
seastar::future<> PGShardManager::stop_pgs()
{
+ ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
return shard_services.invoke_on_all([](auto &local_service) {
return local_service.local_state.stop_pgs();
});
seastar::future<std::map<pg_t, pg_stat_t>>
PGShardManager::get_pg_stats() const
{
+ ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
return shard_services.map_reduce0(
[](auto &local) {
return local.local_state.get_pg_stats();
seastar::future<> PGShardManager::broadcast_map_to_pgs(epoch_t epoch)
{
+ ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
return shard_services.invoke_on_all([epoch](auto &local_service) {
return local_service.local_state.broadcast_map_to_pgs(
local_service, epoch
}
seastar::future<> PGShardManager::set_up_epoch(epoch_t e) {
+ ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
return shard_services.invoke_on_all(
seastar::smp_submit_to_options{},
[e](auto &local_service) {
template <typename F>
auto with_remote_shard_state(core_id_t core, F &&f) {
+ ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
return shard_services.invoke_on(
core, [f=std::move(f)](auto &target_shard_services) mutable {
return std::invoke(
typename T::IRef op
) {
ceph_assert(op->use_count() == 1);
+ ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
auto &logger = crimson::get_logger(ceph_subsys_osd);
static_assert(T::can_create());
logger.debug("{}: can_create", *op);
typename T::IRef op
) {
ceph_assert(op->use_count() == 1);
+ ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
auto &logger = crimson::get_logger(ceph_subsys_osd);
static_assert(!T::can_create());
logger.debug("{}: !can_create", *op);
template <typename T, typename... Args>
auto start_pg_operation(Args&&... args) {
+ ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
auto op = get_local_state().registry.create_operation<T>(
std::forward<Args>(args)...);
auto &logger = crimson::get_logger(ceph_subsys_osd);
auto fut = opref.template enter_stage<>(
opref.get_connection_pipeline().await_active
).then([this, &opref, &logger] {
+ ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
logger.debug("{}: start_pg_operation in await_active stage", opref);
return get_osd_singleton_state().osd_state.when_active();
}).then([&logger, &opref] {
+ ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
logger.debug("{}: start_pg_operation active, entering await_map", opref);
return opref.template enter_stage<>(
opref.get_connection_pipeline().await_map);
}).then([this, &logger, &opref] {
+ ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
logger.debug("{}: start_pg_operation await_map stage", opref);
using OSDMapBlockingEvent =
OSD_OSDMapGate::OSDMapBlocker::BlockingEvent;
&get_shard_services());
});
}).then([&logger, &opref](auto epoch) {
+ ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
logger.debug("{}: got map {}, entering get_pg", opref, epoch);
return opref.template enter_stage<>(
opref.get_connection_pipeline().get_pg);
}).then([this, &logger, &opref, op=std::move(op)]() mutable {
+ logger.debug("{}: in get_pg core {}", opref, seastar::this_shard_id());
+ ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
logger.debug("{}: in get_pg", opref);
if constexpr (T::can_create()) {
logger.debug("{}: can_create", opref);