return osd_singleton_state.local();
}
auto &get_shard_services() {
- ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
return shard_services.local();
}
auto &get_shard_services() const {
- ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
return shard_services.local();
}
auto &get_local_state() { return get_shard_services().local_state; }
template <typename F>
auto with_remote_shard_state(core_id_t core, F &&f) {
- ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
return shard_services.invoke_on(
core, [f=std::move(f)](auto &target_shard_services) mutable {
return std::invoke(
core_id_t core,
typename T::IRef &&op,
F &&f) {
- ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
if (seastar::this_shard_id() == core) {
auto &target_shard_services = shard_services.local();
return std::invoke(
typename T::IRef op
) {
ceph_assert(op->use_count() == 1);
- ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
auto &logger = crimson::get_logger(ceph_subsys_osd);
static_assert(T::can_create());
logger.debug("{}: can_create", *op);
typename T::IRef op
) {
ceph_assert(op->use_count() == 1);
- ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
auto &logger = crimson::get_logger(ceph_subsys_osd);
static_assert(!T::can_create());
logger.debug("{}: !can_create", *op);
template <typename T, typename... Args>
auto start_pg_operation(Args&&... args) {
- ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
auto op = get_local_state().registry.create_operation<T>(
std::forward<Args>(args)...);
auto &logger = crimson::get_logger(ceph_subsys_osd);
auto fut = opref.template enter_stage<>(
opref.get_connection_pipeline().await_active
).then([this, &opref, &logger] {
- ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
logger.debug("{}: start_pg_operation in await_active stage", opref);
- return get_osd_singleton_state().osd_state.when_active();
+ return osd_singleton_state.invoke_on(PRIMARY_CORE, []
+ (auto& primary_osd_singleton_state) {
+ return primary_osd_singleton_state.osd_state.when_active();
+ });
}).then([&logger, &opref] {
- ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
logger.debug("{}: start_pg_operation active, entering await_map", opref);
return opref.template enter_stage<>(
opref.get_connection_pipeline().await_map);
}).then([this, &logger, &opref] {
- ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
logger.debug("{}: start_pg_operation await_map stage", opref);
using OSDMapBlockingEvent =
OSD_OSDMapGate::OSDMapBlocker::BlockingEvent;
return opref.template with_blocking_event<OSDMapBlockingEvent>(
[this, &opref](auto &&trigger) {
std::ignore = this;
- return get_osd_singleton_state().osdmap_gate.wait_for_map(
- std::move(trigger),
- opref.get_epoch(),
- &get_shard_services());
+ return osd_singleton_state.invoke_on(PRIMARY_CORE,
+ [trigger = std::move(trigger), epoch = opref.get_epoch(),
+ &shard_services = shard_services]
+ (auto& primary_osd_singleton_state) mutable {
+ return primary_osd_singleton_state.osdmap_gate.wait_for_map(
+ std::move(trigger),
+ epoch,
+ &shard_services.local());
});
+ });
}).then([&logger, &opref](auto epoch) {
- ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
logger.debug("{}: got map {}, entering get_pg", opref, epoch);
return opref.template enter_stage<>(
opref.get_connection_pipeline().get_pg);
}).then([this, &logger, &opref, op=std::move(op)]() mutable {
logger.debug("{}: in get_pg core {}", opref, seastar::this_shard_id());
- ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
logger.debug("{}: in get_pg", opref);
if constexpr (T::can_create()) {
logger.debug("{}: can_create", opref);