core_id_t core,
typename T::IRef &&op,
F &&f) {
+ ceph_assert(op->use_count() == 1);
if (seastar::this_shard_id() == core) {
auto &target_shard_services = shard_services.local();
return std::invoke(
std::move(f),
- target_shard_services.local_state,
target_shard_services,
std::move(op));
}
/// Runs opref on the appropriate core, creating the pg as necessary.
template <typename T>
seastar::future<> run_with_pg_maybe_create(
- typename T::IRef op
+ typename T::IRef op,
+ ShardServices &target_shard_services
) {
- ceph_assert(op->use_count() == 1);
- auto &logger = crimson::get_logger(ceph_subsys_osd);
static_assert(T::can_create());
- logger.debug("{}: can_create", *op);
-
- return get_pg_to_shard_mapping().maybe_create_pg(
- op->get_pgid()
- ).then([this, op = std::move(op)](auto core) mutable {
- return this->template with_remote_shard_state_and_op<T>(
- core, std::move(op),
- [](ShardServices &shard_services,
- typename T::IRef op) {
- auto &logger = crimson::get_logger(ceph_subsys_osd);
- auto &opref = *op;
- return opref.template with_blocking_event<
- PGMap::PGCreationBlockingEvent
- >([&shard_services, &opref](
- auto &&trigger) {
- return shard_services.get_or_create_pg(
- std::move(trigger),
- opref.get_pgid(),
- std::move(opref.get_create_info())
- );
- }).safe_then([&logger, &shard_services, &opref](Ref<PG> pgref) {
- logger.debug("{}: have_pg", opref);
- return opref.with_pg(shard_services, pgref);
- }).handle_error(
- crimson::ct_error::ecanceled::handle([&logger, &opref](auto) {
- logger.debug("{}: pg creation canceled, dropping", opref);
- return seastar::now();
- })
- ).then([op=std::move(op)] {});
- });
- });
+ auto &logger = crimson::get_logger(ceph_subsys_osd);
+ auto &opref = *op;
+ return opref.template with_blocking_event<
+ PGMap::PGCreationBlockingEvent
+ >([&target_shard_services, &opref](auto &&trigger) {
+ return target_shard_services.get_or_create_pg(
+ std::move(trigger),
+ opref.get_pgid(),
+ std::move(opref.get_create_info())
+ );
+ }).safe_then([&logger, &target_shard_services, &opref](Ref<PG> pgref) {
+ logger.debug("{}: have_pg", opref);
+ return opref.with_pg(target_shard_services, pgref);
+ }).handle_error(
+ crimson::ct_error::ecanceled::handle([&logger, &opref](auto) {
+ logger.debug("{}: pg creation canceled, dropping", opref);
+ return seastar::now();
+ })
+ ).then([op=std::move(op)] {});
}
/// Runs opref on the appropriate core, waiting for pg as necessary
template <typename T>
seastar::future<> run_with_pg_maybe_wait(
- typename T::IRef op
+ typename T::IRef op,
+ ShardServices &target_shard_services
) {
- ceph_assert(op->use_count() == 1);
- auto &logger = crimson::get_logger(ceph_subsys_osd);
static_assert(!T::can_create());
- logger.debug("{}: !can_create", *op);
-
- return get_pg_to_shard_mapping().maybe_create_pg(
- op->get_pgid()
- ).then([this, op = std::move(op)](auto core) mutable {
- return this->template with_remote_shard_state_and_op<T>(
- core, std::move(op),
- [](ShardServices &shard_services,
- typename T::IRef op) {
- auto &logger = crimson::get_logger(ceph_subsys_osd);
- auto &opref = *op;
- return opref.template with_blocking_event<
- PGMap::PGCreationBlockingEvent
- >([&shard_services, &opref](
- auto &&trigger) {
- return shard_services.wait_for_pg(
- std::move(trigger), opref.get_pgid());
- }).safe_then([&logger, &shard_services, &opref](Ref<PG> pgref) {
- logger.debug("{}: have_pg", opref);
- return opref.with_pg(shard_services, pgref);
- }).handle_error(
- crimson::ct_error::ecanceled::handle([&logger, &opref](auto) {
- logger.debug("{}: pg creation canceled, dropping", opref);
- return seastar::now();
- })
- ).then([op=std::move(op)] {});
- });
- });
+ auto &logger = crimson::get_logger(ceph_subsys_osd);
+ auto &opref = *op;
+ return opref.template with_blocking_event<
+ PGMap::PGCreationBlockingEvent
+ >([&target_shard_services, &opref](auto &&trigger) {
+ return target_shard_services.wait_for_pg(
+ std::move(trigger), opref.get_pgid());
+ }).safe_then([&logger, &target_shard_services, &opref](Ref<PG> pgref) {
+ logger.debug("{}: have_pg", opref);
+ return opref.with_pg(target_shard_services, pgref);
+ }).handle_error(
+ crimson::ct_error::ecanceled::handle([&logger, &opref](auto) {
+ logger.debug("{}: pg creation canceled, dropping", opref);
+ return seastar::now();
+ })
+ ).then([op=std::move(op)] {});
}
seastar::future<> load_pgs(crimson::os::FuturizedStore& store);
logger.debug("{}: got map {}, entering get_pg", opref, epoch);
return opref.template enter_stage<>(
opref.get_connection_pipeline().get_pg);
- }).then([this, &logger, &opref, op=std::move(op)]() mutable {
- logger.debug("{}: in get_pg core {}", opref, seastar::this_shard_id());
- logger.debug("{}: in get_pg", opref);
- if constexpr (T::can_create()) {
- logger.debug("{}: can_create", opref);
- return run_with_pg_maybe_create<T>(std::move(op));
- } else {
- logger.debug("{}: !can_create", opref);
- return run_with_pg_maybe_wait<T>(std::move(op));
- }
+ }).then([this, &opref] {
+ return get_pg_to_shard_mapping().maybe_create_pg(opref.get_pgid());
+ }).then([this, &logger, op=std::move(op)](auto core) mutable {
+ logger.debug("{}: can_create={}, target-core={}",
+ *op, T::can_create(), core);
+ return this->template with_remote_shard_state_and_op<T>(
+ core, std::move(op),
+ [this](ShardServices &target_shard_services,
+ typename T::IRef op) {
+ if constexpr (T::can_create()) {
+ return this->template run_with_pg_maybe_create<T>(
+ std::move(op), target_shard_services);
+ } else {
+ return this->template run_with_pg_maybe_wait<T>(
+ std::move(op), target_shard_services);
+ }
+ });
});
return std::make_pair(id, std::move(fut));
}